aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-11-27 20:19:54 +0300
committergvit <gvit@ydb.tech>2023-11-27 20:45:02 +0300
commit683fcca92aa79893d3d0c44a36d27c10c807c300 (patch)
tree12d63f682f52716b29bc1d194e2b88c41674188f
parent8dd796d6a2f607d86ff7c85931b580768ac6d5ab (diff)
downloadydb-683fcca92aa79893d3d0c44a36d27c10c807c300.tar.gz
Revert commit rXXXXXX, YQL-16986 wide combiner with spilling prototype
-rw-r--r--ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp7
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_collect.cpp5
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp464
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp54
-rw-r--r--ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-arm64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_pack.h3
-rw-r--r--ydb/library/yql/minikql/computation/mkql_spiller.cpp61
-rw-r--r--ydb/library/yql/minikql/computation/mkql_spiller.h27
-rw-r--r--ydb/library/yql/minikql/computation/mkql_spiller_adapter.h88
-rw-r--r--ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp73
-rw-r--r--ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-arm64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/no_llvm/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-arm64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/ut/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/ut/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/ut/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/ut/ya.make1
-rw-r--r--ydb/library/yql/minikql/computation/ya.make.inc1
26 files changed, 64 insertions, 735 deletions
diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
index db011ef730..e7c3b4372d 100644
--- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
+++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
@@ -162,13 +162,8 @@ private:
LOG_T("Task runner. Inject watermark " << watermark);
TaskRunner->SetWatermarkIn(watermark);
}
+
res = TaskRunner->Run();
-// Uncomment me to test YQL-16986 prototype
-// Delete me after YQL-16988
-// if (ERunStatus::PendingInput == res){
-// //very poor man waiting for spiller async operation completion
-// Schedule(TDuration::MilliSeconds(1), new TEvContinueRun(THashSet<ui32>(ev->Get()->InputChannels), ev->Get()->MemLimit));
-// }
}
for (auto& channelId : inputMap) {
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_collect.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_collect.cpp
index 0710fb8ed2..2a4e0bb1bc 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_collect.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_collect.cpp
@@ -20,9 +20,8 @@ public:
if (item.IsFinish()) {
return list.Release();
}
- if (!item.IsYield()) {
- list = ctx.HolderFactory.Append(list.Release(), item.Release());
- }
+ MKQL_ENSURE(!item.IsYield(), "Unexpected flow status!");
+ list = ctx.HolderFactory.Append(list.Release(), item.Release());
}
}
#ifndef MKQL_DISABLE_CODEGEN
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
index d54cf0cb34..73c9077fb4 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
@@ -7,8 +7,6 @@
#include <ydb/library/yql/minikql/mkql_node_cast.h>
#include <ydb/library/yql/minikql/mkql_stats_registry.h>
#include <ydb/library/yql/minikql/defs.h>
-#include <ydb/library/yql/minikql/computation/mkql_spiller.h>
-#include <ydb/library/yql/minikql/computation/mkql_spiller_adapter.h>
#include <ydb/library/yql/utils/cast.h>
#include <util/string/cast.h>
@@ -108,16 +106,6 @@ struct TCombinerNodes {
std::transform(itemsOnResults.cbegin(), itemsOnResults.cend(), PasstroughtItems.begin(), [](const TPasstroughtMap::value_type& v) { return v.has_value(); });
}
- bool IsInputItemNodeUsed(size_t i) const {
- return (ItemNodes[i]->GetDependencesCount() > 0U || PasstroughtItems[i]);
- }
-
- NUdf::TUnboxedValue* GetUsedInputItemNodePtrOrNull(TComputationContext& ctx, size_t i) const {
- return IsInputItemNodeUsed(i) ?
- &ItemNodes[i]->RefValue(ctx) :
- nullptr;
- }
-
void ExtractKey(TComputationContext& ctx, NUdf::TUnboxedValue** values, NUdf::TUnboxedValue* keys) const {
std::for_each(ItemNodes.cbegin(), ItemNodes.cend(), [&](IComputationExternalNode* item) {
if (const auto pointer = *values++)
@@ -307,367 +295,6 @@ private:
TStates States;
};
-class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
- typedef TComputationValue<TSpillingSupportState> TBase;
-public:
- enum class EOperatingMode {
- InMemory, // try to perform all processing in memory
- SpillState, //after switching to the spilling mode we spill collected state and free allocated memory
- SpillData, //store incoming data
- ProcessSpilled //restore and process spilled data
- };
- TSpillingSupportState(
- TMemoryUsageInfo* memInfo,
- const TCombinerNodes& nodes, IComputationWideFlowNode *const flow, size_t wideFieldsIndex,
- const TMultiType* usedInputItemType, const TMultiType* keyAndStateType, ui32 keyWidth,
- const THashFunc& hash, const TEqualsFunc& equal
- )
- : TBase(memInfo)
- , InMemoryProcessingState(memInfo, keyWidth, keyAndStateType->GetElementsCount() - keyWidth, hash, equal)
- , Nodes(nodes)
- , Flow(flow)
- , WideFieldsIndex(wideFieldsIndex)
- , UsedInputItemType(usedInputItemType)
- , KeyAndStateType(keyAndStateType)
- , KeyWidth(keyWidth)
- , Hasher(hash)
- , Mode(EOperatingMode::InMemory)
- {
- BufferForUsedInputItems.reserve(usedInputItemType->GetElementsCount());
- BufferForKeyAnsState.reserve(keyAndStateType->GetElementsCount());
- }
- ~TSpillingSupportState() {
- }
-
- EFetchResult DoCalculate(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) {
- if (HasRunningSpillingAsyncOperation()) {
- return EFetchResult::Yield;
- }
- while (true) {
- switch(GetMode()) {
- case EOperatingMode::InMemory: {
- auto r = DoCalculateInMemory(ctx, output);
- if (GetMode() == TSpillingSupportState::EOperatingMode::InMemory) {
- return r;
- }
- break;
- }
- case EOperatingMode::SpillState: {
- SpillState();
- if (GetMode() == EOperatingMode::SpillState) {
- return AsyncWrite();
- }
- MKQL_ENSURE(GetMode() == EOperatingMode::SpillData, "Internal logic error");
- break;
- }
- case EOperatingMode::SpillData: {
- SpillData(ctx);
- if (GetMode() == EOperatingMode::SpillData) {
- return AsyncWrite();
- }
- break;
- }
- case EOperatingMode::ProcessSpilled: {
- return ProcessSpilledData(ctx, output);
- }
- }
- }
- Y_UNREACHABLE();
- }
-private:
- EFetchResult AsyncWrite() {
- MKQL_ENSURE(!AsyncReadOperation.has_value(), "Internal logic error");
- MKQL_ENSURE(AsyncWriteOperation.has_value(), "Internal logic error");
- MKQL_ENSURE(AsyncWriteOperation->HasValue(), "Internal logic error");
- //AsyncWriteOperation.Subscribe() //TODO YQL-16988
- return EFetchResult::Yield;
- }
- EFetchResult AsyncRead() {
- MKQL_ENSURE(!AsyncWriteOperation.has_value(), "Internal logic error");
- MKQL_ENSURE(AsyncReadOperation.has_value(), "Internal logic error");
- MKQL_ENSURE(AsyncReadOperation->HasValue(), "Internal logic error");
- //AsyncReadOperation.Subscribe() //TODO YQL-16988
- return EFetchResult::Yield;
- }
- EFetchResult DoCalculateInMemory(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) {
- auto **fields = ctx.WideFields.data() + WideFieldsIndex;
- while (InputDataFetchResult != EFetchResult::Finish) {
- for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i) {
- fields[i] = Nodes.GetUsedInputItemNodePtrOrNull(ctx, i);
- }
- InputDataFetchResult = Flow->FetchValues(ctx, fields);
- switch (InputDataFetchResult) {
- case EFetchResult::One: {
- Nodes.ExtractKey(ctx, fields, static_cast<NUdf::TUnboxedValue *>(InMemoryProcessingState.Tongue));
- const bool isNew = InMemoryProcessingState.TasteIt();
- Nodes.ProcessItem(
- ctx,
- isNew ? nullptr : static_cast<NUdf::TUnboxedValue *>(InMemoryProcessingState.Tongue),
- static_cast<NUdf::TUnboxedValue *>(InMemoryProcessingState.Throat)
- );
- if (IsSwitchToSpillingModeCondition()) {
- SwitchMode(EOperatingMode::SpillState);
- return EFetchResult::Yield;
- }
- continue;
- }
- case EFetchResult::Yield:
- return EFetchResult::Yield;
- case EFetchResult::Finish:
- break;
- }
- }
- if (const auto values = static_cast<NUdf::TUnboxedValue*>(InMemoryProcessingState.Extract())) {
- Nodes.FinishItem(ctx, values, output);
- return EFetchResult::One;
- }
- return EFetchResult::Finish;
- }
-
- void SpillState() {
- // mini State machine that:
- //1. Spills InMemoryProcessingState
- //2. Finilizes buckets
- //During each phase asyn operation may be returned
- //On next call execution is continued
- MKQL_ENSURE(!AsyncReadOperation, "Internal logic error");
- if (AsyncWriteOperation) {
- MKQL_ENSURE(AsyncWriteOperation->HasValue(), "Internal logic error");
- SpilledBuckets[CurrentAsyncOperationBucketId].InitialState->AsyncWriteCompleted(AsyncWriteOperation->ExtractValue());
- AsyncWriteOperation = std::nullopt;
- }
- while (const auto keyAndState = InMemoryProcessingState.Extract()) {
- auto hash = Hasher(keyAndState); //Hasher uses only key for hashing
- auto bucketId = hash % SpilledBucketCount;
- auto& bucket = SpilledBuckets[bucketId];
- MKQL_ENSURE(bucket.InitialState, "Internal logic error");
- if (auto chunkIsBeingStored = bucket.InitialState->WriteWideItem({keyAndState, KeyAndStateType->GetElementsCount()})) {
- CurrentAsyncOperationBucketId = bucketId;
- AsyncWriteOperation = chunkIsBeingStored;
- return;
- }
- for (size_t i = 0; i != KeyAndStateType->GetElementsCount(); ++i) {
- //releasing values stored in unsafe TUnboxedValue buffer
- keyAndState[i].UnRef();
- }
- }
- if (!FinalizingSpillerBuckets) {
- FinalizingSpillerBuckets = true;
- CurrentAsyncOperationBucketId = 0;
- }
- for(; CurrentAsyncOperationBucketId != SpilledBuckets.size(); ++CurrentAsyncOperationBucketId) {
- if (auto chunkIsBeingStored = SpilledBuckets[CurrentAsyncOperationBucketId].InitialState->FinishWriting()) {
- AsyncWriteOperation = chunkIsBeingStored;
- return;
- }
- }
- InMemoryProcessingState.IsEmpty();
- SwitchMode(EOperatingMode::SpillData);
- }
-
- void SpillData(TComputationContext& ctx) {
- if (AsyncWriteOperation) {
- MKQL_ENSURE(AsyncWriteOperation->HasValue(), "Internal logic error");
- SpilledBuckets[CurrentAsyncOperationBucketId].Data->AsyncWriteCompleted(AsyncWriteOperation->ExtractValue());
- AsyncWriteOperation = std::nullopt;
- }
- while (!FinalizingSpillerBuckets) {
- auto **fields = ctx.WideFields.data() + WideFieldsIndex;
- for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i) {
- fields[i] = Nodes.GetUsedInputItemNodePtrOrNull(ctx, i);
- }
- InputDataFetchResult = Flow->FetchValues(ctx, fields);
- switch (InputDataFetchResult) {
- case EFetchResult::One: {
- BufferForKeyAnsState.resize(KeyWidth); //use only key part of the buffer
- Nodes.ExtractKey(ctx, fields, BufferForKeyAnsState.data());
- auto hash = Hasher(BufferForKeyAnsState.data());
- BufferForKeyAnsState.resize(0); //for freeing allocated key value asap
- auto bucketId = hash % SpilledBucketCount;
- MKQL_ENSURE(BufferForUsedInputItems.empty(), "Internal logic error");
- for (size_t i = 0; i != Nodes.ItemNodes.size(); ++i) {
- if (fields[i]) {
- BufferForUsedInputItems.push_back(*fields[i]);
- }
- }
- auto &bucket = SpilledBuckets[bucketId];
- const auto chunkIsBeingStored = bucket.Data->WriteWideItem(BufferForUsedInputItems);
- BufferForUsedInputItems.resize(0); //for freeing allocated key value asap
- if (chunkIsBeingStored) {
- CurrentAsyncOperationBucketId = bucketId;
- AsyncWriteOperation = chunkIsBeingStored;
- return;
- }
- break;
- }
- case EFetchResult::Yield:
- return;
- case EFetchResult::Finish:
- CurrentAsyncOperationBucketId = 0;
- FinalizingSpillerBuckets = true;
- break;
- }
- }
- for(; CurrentAsyncOperationBucketId != SpilledBuckets.size(); ++CurrentAsyncOperationBucketId) {
- auto& bucket = SpilledBuckets[CurrentAsyncOperationBucketId];
- if (auto chunkIsBeingStored = bucket.Data->FinishWriting()) {
- AsyncWriteOperation = chunkIsBeingStored;
- return;
- }
- }
- SwitchMode(EOperatingMode::ProcessSpilled);
- }
-
- EFetchResult ProcessSpilledData(TComputationContext& ctx, NUdf::TUnboxedValue*const* output){
- if (AsyncReadOperation) {
- MKQL_ENSURE(AsyncReadOperation->HasValue(), "Internal logic error");
- if (RecoverState) {
- SpilledBuckets[0].InitialState->AsyncReadCompleted(
- AsyncReadOperation->ExtractValue(), ctx.HolderFactory);
- } else {
- SpilledBuckets[0].Data->AsyncReadCompleted(
- AsyncReadOperation->ExtractValue(), ctx.HolderFactory);
- }
- AsyncWriteOperation = std::nullopt;
- }
- while(!SpilledBuckets.empty()){
- auto& bucket = SpilledBuckets.front();
- //recover spilled state
- while(!bucket.InitialState->Empty()) {
- RecoverState = true;
- BufferForKeyAnsState.resize(KeyAndStateType->GetElementsCount());
- AsyncReadOperation = bucket.InitialState->ExtractWideItem(BufferForKeyAnsState);
- if (AsyncReadOperation) {
- BufferForKeyAnsState.resize(0);
- return AsyncRead();
- }
- for (size_t i = 0; i != KeyWidth; ++i) {
- //jumping into unsafe world, refusing ownership
- static_cast<NUdf::TUnboxedValue&>(InMemoryProcessingState.Tongue[i]) = std::move(BufferForKeyAnsState[i]);
- }
- auto isNew = InMemoryProcessingState.TasteIt();
- MKQL_ENSURE(isNew, "Internal logic error");
- for (size_t i = KeyWidth; i != KeyAndStateType->GetElementsCount(); ++i) {
- //jumping into unsafe world, refusing ownership
- static_cast<NUdf::TUnboxedValue&>(InMemoryProcessingState.Throat[i - KeyWidth]) = std::move(BufferForKeyAnsState[i]);
- }
- BufferForKeyAnsState.resize(0);
- }
- //process spilled data
- while(!bucket.Data->Empty()) {
- RecoverState = false;
- BufferForUsedInputItems.resize(UsedInputItemType->GetElementsCount());
- AsyncReadOperation = bucket.Data->ExtractWideItem(BufferForUsedInputItems);
- if (AsyncReadOperation) {
- return AsyncRead();
- }
- auto **fields = ctx.WideFields.data() + WideFieldsIndex;
- for (size_t i = 0, j = 0; i != Nodes.ItemNodes.size(); ++i) {
- if (Nodes.IsInputItemNodeUsed(i)) {
- fields[i] = &BufferForUsedInputItems[j++];
- } else {
- fields[i] = nullptr;
- }
- }
- Nodes.ExtractKey(ctx, fields, static_cast<NUdf::TUnboxedValue *>(InMemoryProcessingState.Tongue));
- const bool isNew = InMemoryProcessingState.TasteIt();
- Nodes.ProcessItem(
- ctx,
- isNew ? nullptr : static_cast<NUdf::TUnboxedValue *>(InMemoryProcessingState.Tongue),
- static_cast<NUdf::TUnboxedValue *>(InMemoryProcessingState.Throat)
- );
- BufferForKeyAnsState.resize(0);
- }
- if (const auto values = static_cast<NUdf::TUnboxedValue*>(InMemoryProcessingState.Extract())) {
- Nodes.FinishItem(ctx, values, output);
- return EFetchResult::One;
- }
- InMemoryProcessingState.IsEmpty();
- SpilledBuckets.pop_front();
- }
- return EFetchResult::Finish;
- }
-
- EOperatingMode GetMode() const {
- return Mode;
- }
-
- bool HasRunningSpillingAsyncOperation() const {
- return
- (AsyncWriteOperation.has_value() && !AsyncWriteOperation->HasValue()) ||
- (AsyncReadOperation.has_value() && !AsyncReadOperation->HasValue());
- }
-
- void SwitchMode(EOperatingMode mode) {
- MKQL_ENSURE(!AsyncReadOperation, "Internal logic error");
- MKQL_ENSURE(!AsyncWriteOperation, "Internal logic error");
- switch(mode) {
- case EOperatingMode::InMemory:
- MKQL_ENSURE(false, "Internal logic error");
- break;
- case EOperatingMode::SpillState: {
- MKQL_ENSURE(EOperatingMode::InMemory == Mode, "Internal logic error");
- MKQL_ENSURE(!Spiller,"Internal logic error");
- Spiller = MakeSpiller();
- SpilledBuckets.resize(SpilledBucketCount);
- for (auto &b: SpilledBuckets) {
- b.InitialState = std::make_unique<TWideUnboxedValuesSpillerAdapter>(Spiller, KeyAndStateType, 1 << 20);
- }
- FinalizingSpillerBuckets = false;
- break;
- }
- case EOperatingMode::SpillData:
- MKQL_ENSURE(EOperatingMode::SpillState == Mode, "Internal logic error");
- MKQL_ENSURE(Spiller,"Internal logic error");
- MKQL_ENSURE(SpilledBuckets.size() == SpilledBucketCount, "Internal logic error");
- for (auto &b: SpilledBuckets) {
- b.Data = std::make_unique<TWideUnboxedValuesSpillerAdapter>(Spiller, UsedInputItemType, 1 << 20);
- }
- InputDataFetchResult = EFetchResult::Yield;
- FinalizingSpillerBuckets = false;
- break;
- case EOperatingMode::ProcessSpilled:
- MKQL_ENSURE(EOperatingMode::SpillData == Mode, "Internal logic error");
- MKQL_ENSURE(Spiller,"Internal logic error");
- MKQL_ENSURE(SpilledBuckets.size() == SpilledBucketCount, "Internal logic error");
- break;
- }
- Mode = mode;
- }
-
- bool IsSwitchToSpillingModeCondition() const {
- //TODO implement me
- return false;
- }
-
-private:
- TState InMemoryProcessingState;
- const TCombinerNodes& Nodes;
- IComputationWideFlowNode* const Flow;
- const size_t WideFieldsIndex;
- const TMultiType* const UsedInputItemType;
- const TMultiType* const KeyAndStateType;
- const size_t KeyWidth;
- THashFunc const Hasher;
- EOperatingMode Mode;
- bool FinalizingSpillerBuckets; // sub mode for SpillState and SpillData
- bool RecoverState; //sub mode for ProcessSpilledData
-
- struct TSpilledBucket {
- std::unique_ptr<TWideUnboxedValuesSpillerAdapter> InitialState; //state collected before switching to spilling mode
- std::unique_ptr<TWideUnboxedValuesSpillerAdapter> Data; //data collected in spilling mode
- };
- static constexpr size_t SpilledBucketCount = 4;
- std::deque<TSpilledBucket> SpilledBuckets;
- ISpiller::TPtr Spiller;
- EFetchResult InputDataFetchResult;
- size_t CurrentAsyncOperationBucketId;
- std::optional<NThreading::TFuture<ISpiller::TKey>> AsyncWriteOperation;
- std::optional<NThreading::TFuture<TRope>> AsyncReadOperation;
- TUnboxedValueVector BufferForUsedInputItems;
- TUnboxedValueVector BufferForKeyAnsState;
-};
-
#ifndef MKQL_DISABLE_CODEGEN
class TLLVMFieldsStructureState: public TLLVMFieldsStructure<TComputationValue<TState>> {
private:
@@ -1110,31 +737,48 @@ class TWideLastCombinerWrapper: public TStatefulWideFlowCodegeneratorNode<TWideL
{
using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWrapper>;
public:
- TWideLastCombinerWrapper(
- TComputationMutables& mutables,
- IComputationWideFlowNode* flow,
- TCombinerNodes&& nodes,
- const TMultiType* usedInputItemType,
- TKeyTypes&& keyTypes,
- const TMultiType* keyAndStateType
- )
+ TWideLastCombinerWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, TCombinerNodes&& nodes, TKeyTypes&& keyTypes)
: TBaseComputation(mutables, flow, EValueRepresentation::Boxed)
, Flow(flow)
, Nodes(std::move(nodes))
, KeyTypes(std::move(keyTypes))
- , UsedInputItemType(usedInputItemType)
- , KeyAndStateType(keyAndStateType)
, WideFieldsIndex(mutables.IncrementWideFieldsIndex(Nodes.ItemNodes.size()))
{}
- EFetchResult DoCalculate(NUdf::TUnboxedValue& stateValue, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
- if (!stateValue.HasValue()) {
- MakeSpillingSupportState(ctx, stateValue);
+ EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
+ if (!state.HasValue()) {
+ MakeState(ctx, state);
+ }
+
+ if (const auto ptr = static_cast<TState*>(state.AsBoxed().Get())) {
+ auto **fields = ctx.WideFields.data() + WideFieldsIndex;
+
+ while (EFetchResult::Finish != ptr->InputStatus) {
+ for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i)
+ if (Nodes.ItemNodes[i]->GetDependencesCount() > 0U || Nodes.PasstroughtItems[i])
+ fields[i] = &Nodes.ItemNodes[i]->RefValue(ctx);
+
+ switch (ptr->InputStatus = Flow->FetchValues(ctx, fields)) {
+ case EFetchResult::One:
+ Nodes.ExtractKey(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Tongue));
+ Nodes.ProcessItem(ctx, ptr->TasteIt() ? nullptr : static_cast<NUdf::TUnboxedValue*>(ptr->Tongue), static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
+ continue;
+ case EFetchResult::Yield:
+ return EFetchResult::Yield;
+ case EFetchResult::Finish:
+ break;
+ }
+ }
+
+ if (const auto values = static_cast<NUdf::TUnboxedValue*>(ptr->Extract())) {
+ Nodes.FinishItem(ctx, values, output);
+ return EFetchResult::One;
+ }
+
+ return EFetchResult::Finish;
}
- auto *const state = static_cast<TSpillingSupportState *>(stateValue.AsBoxed().Get());
- return state->DoCalculate(ctx, output);
+ Y_UNREACHABLE();
}
-
#ifndef MKQL_DISABLE_CODEGEN
ICodegeneratorInlineWideNode::TGenerateResult DoGenGetValues(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
auto& context = ctx.Codegen.GetContext();
@@ -1374,14 +1018,6 @@ private:
ctx.ExecuteLLVM && Equals ? TEqualsFunc(std::ptr_fun(Equals)) : TEqualsFunc(TMyValueEqual(KeyTypes))
);
#endif
- }
- void MakeSpillingSupportState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
- state = ctx.HolderFactory.Create<TSpillingSupportState>(Nodes, Flow, WideFieldsIndex,
- UsedInputItemType, KeyAndStateType,
- Nodes.KeyNodes.size(),
- TMyValueHasher(KeyTypes),
- TMyValueEqual(KeyTypes)
- );
}
void RegisterDependencies() const final {
@@ -1396,8 +1032,7 @@ private:
IComputationWideFlowNode *const Flow;
const TCombinerNodes Nodes;
const TKeyTypes KeyTypes;
- const TMultiType* const UsedInputItemType;
- const TMultiType* const KeyAndStateType;
+
const ui32 WideFieldsIndex;
#ifndef MKQL_DISABLE_CODEGEN
@@ -1435,8 +1070,8 @@ private:
template<bool Last>
IComputationNode* WrapWideCombinerT(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() >= (Last ? 3U : 4U), "Expected more arguments.");
- const auto inputType = AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType());
- const auto inputWidth = GetWideComponentsCount(inputType);
+
+ const auto inputWidth = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType()));
const auto outputWidth = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetType()->GetReturnType()));
const auto flow = LocateNode(ctx.NodeLocator, callable, 0U);
@@ -1447,12 +1082,10 @@ IComputationNode* WrapWideCombinerT(TCallable& callable, const TComputationNodeF
const auto stateSize = AS_VALUE(TDataLiteral, callable.GetInput(++index))->AsValue().Get<ui32>();
++index += inputWidth;
- std::vector<TType*> keyAndStateItemTypes;
- keyAndStateItemTypes.reserve(keysSize + stateSize);
+
TKeyTypes keyTypes;
keyTypes.reserve(keysSize);
for (ui32 i = index; i < index + keysSize; ++i) {
- keyAndStateItemTypes.push_back(callable.GetInput(i).GetStaticType());
bool optional;
keyTypes.emplace_back(*UnpackOptionalData(callable.GetInput(i).GetStaticType(), optional)->GetDataSlot(), optional);
}
@@ -1463,10 +1096,7 @@ IComputationNode* WrapWideCombinerT(TCallable& callable, const TComputationNodeF
index += keysSize;
nodes.InitResultNodes.reserve(stateSize);
- for (size_t i = 0; i != stateSize; ++i) {
- keyAndStateItemTypes.push_back(callable.GetInput(index).GetStaticType());
- nodes.InitResultNodes.push_back(LocateNode(ctx.NodeLocator, callable, index++));
- }
+ std::generate_n(std::back_inserter(nodes.InitResultNodes), stateSize, [&](){ return LocateNode(ctx.NodeLocator, callable, index++); } );
index += stateSize;
nodes.UpdateResultNodes.reserve(stateSize);
@@ -1495,21 +1125,9 @@ IComputationNode* WrapWideCombinerT(TCallable& callable, const TComputationNodeF
nodes.BuildMaps();
if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) {
- if constexpr (Last) {
- const auto inputItemTypes = GetWideComponents(inputType);
- std::vector<TType*> usedInputItemTypes;
- usedInputItemTypes.reserve(inputItemTypes.size());
- for(size_t i = 0; i != inputItemTypes.size(); ++i) {
- if (nodes.IsInputItemNodeUsed(i)) {
- usedInputItemTypes.push_back(inputItemTypes[i]);
- }
- }
- return new TWideLastCombinerWrapper(ctx.Mutables, wide, std::move(nodes),
- TMultiType::Create(usedInputItemTypes.size(), usedInputItemTypes.data(), ctx.Env),
- std::move(keyTypes),
- TMultiType::Create(keyAndStateItemTypes.size(), keyAndStateItemTypes.data(), ctx.Env)
- );
- } else {
+ if constexpr (Last)
+ return new TWideLastCombinerWrapper(ctx.Mutables, wide, std::move(nodes), std::move(keyTypes));
+ else {
const auto memLimit = AS_VALUE(TDataLiteral, callable.GetInput(1U))->AsValue().Get<ui64>();
if (EGraphPerProcess::Single == ctx.GraphPerProcess)
return new TWideCombinerWrapper<true>(ctx.Mutables, wide, std::move(nodes), std::move(keyTypes), memLimit);
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp
index 78dbb12569..8e49947c0d 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp
@@ -923,18 +923,14 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
const auto graph = setup.BuildGraph(pgmReturn);
const auto iterator = graph->GetValue().GetListIterator();
NUdf::TUnboxedValue item;
- THashSet<TString> expectedResults = {
- "key one",
- "very long value 2 / key two",
- "very long key one",
- "very long value 8 / very long value 7 / very long value 6"
- };
- while(!expectedResults.empty()) {
- UNIT_ASSERT(iterator.Next(item));
- TString val{item.AsStringRef()};
- UNIT_ASSERT(expectedResults.contains(val));
- expectedResults.erase(val);
- }
+ UNIT_ASSERT(iterator.Next(item));
+ UNBOXED_VALUE_STR_EQUAL(item, "key one");
+ UNIT_ASSERT(iterator.Next(item));
+ UNBOXED_VALUE_STR_EQUAL(item, "very long value 2 / key two");
+ UNIT_ASSERT(iterator.Next(item));
+ UNBOXED_VALUE_STR_EQUAL(item, "very long key one");
+ UNIT_ASSERT(iterator.Next(item));
+ UNBOXED_VALUE_STR_EQUAL(item, "very long value 8 / very long value 7 / very long value 6");
UNIT_ASSERT(!iterator.Next(item));
UNIT_ASSERT(!iterator.Next(item));
}
@@ -1002,18 +998,14 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
const auto graph = setup.BuildGraph(pgmReturn);
const auto iterator = graph->GetValue().GetListIterator();
NUdf::TUnboxedValue item;
- THashSet<TString> expectedResults = {
- "very long value 1 / key one / very long value 1 / key one",
- "very long value 3 / key two / very long value 2 / key two",
- "very long value 4 / very long key one / very long value 4 / very long key one",
- "very long value 9 / very long key two / very long value 5 / very long key two"
- };
- while (!expectedResults.empty()) {
- UNIT_ASSERT(iterator.Next(item));
- TString val{item.AsStringRef()};
- UNIT_ASSERT(expectedResults.contains(val));
- expectedResults.erase(val);
- }
+ UNIT_ASSERT(iterator.Next(item));
+ UNBOXED_VALUE_STR_EQUAL(item, "very long value 1 / key one / very long value 1 / key one");
+ UNIT_ASSERT(iterator.Next(item));
+ UNBOXED_VALUE_STR_EQUAL(item, "very long value 3 / key two / very long value 2 / key two");
+ UNIT_ASSERT(iterator.Next(item));
+ UNBOXED_VALUE_STR_EQUAL(item, "very long value 4 / very long key one / very long value 4 / very long key one");
+ UNIT_ASSERT(iterator.Next(item));
+ UNBOXED_VALUE_STR_EQUAL(item, "very long value 9 / very long key two / very long value 5 / very long key two");
UNIT_ASSERT(!iterator.Next(item));
UNIT_ASSERT(!iterator.Next(item));
}
@@ -1078,17 +1070,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
const auto graph = setup.BuildGraph(pgmReturn);
const auto iterator = graph->GetValue().GetListIterator();
- THashSet<TString> expectedResults = {
- "key one / value 2 / value 1 / value 5 / value 4",
- "key two / value 4 / value 3 / value 3 / value 2",
- };
NUdf::TUnboxedValue item;
- while (!expectedResults.empty()) {
- UNIT_ASSERT(iterator.Next(item));
- TString val{item.AsStringRef()};
- UNIT_ASSERT(expectedResults.contains(val));
- expectedResults.erase(val);
- }
+ UNIT_ASSERT(iterator.Next(item));
+ UNBOXED_VALUE_STR_EQUAL(item, "key one / value 2 / value 1 / value 5 / value 4");
+ UNIT_ASSERT(iterator.Next(item));
+ UNBOXED_VALUE_STR_EQUAL(item, "key two / value 4 / value 3 / value 3 / value 2");
UNIT_ASSERT(!iterator.Next(item));
UNIT_ASSERT(!iterator.Next(item));
}
diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-arm64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-arm64.txt
index 344f00fd5f..364f602ce5 100644
--- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-arm64.txt
+++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-arm64.txt
@@ -48,7 +48,6 @@ target_sources(minikql-computation-llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp
diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt
index 344f00fd5f..364f602ce5 100644
--- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt
@@ -48,7 +48,6 @@ target_sources(minikql-computation-llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp
diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt
index f0a0459bfc..2c5ea976b0 100644
--- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt
@@ -49,7 +49,6 @@ target_sources(minikql-computation-llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp
diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt
index f0a0459bfc..2c5ea976b0 100644
--- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt
@@ -49,7 +49,6 @@ target_sources(minikql-computation-llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp
diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt
index 344f00fd5f..364f602ce5 100644
--- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt
@@ -48,7 +48,6 @@ target_sources(minikql-computation-llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h
index d8dee524b2..f1948212bd 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h
@@ -85,9 +85,6 @@ public:
size_t PackedSizeEstimate() const {
return IsBlock_ ? BlockBuffer_.size() : (Buffer_ ? (Buffer_->Size() + Buffer_->ReservedHeaderSize()) : 0);
}
- bool IsEmpty() const {
- return !ItemCount_;
- }
void Clear();
TRope Finish();
diff --git a/ydb/library/yql/minikql/computation/mkql_spiller.cpp b/ydb/library/yql/minikql/computation/mkql_spiller.cpp
deleted file mode 100644
index 803dac9c56..0000000000
--- a/ydb/library/yql/minikql/computation/mkql_spiller.cpp
+++ /dev/null
@@ -1,61 +0,0 @@
-#include "mkql_spiller.h"
-#include <library/cpp/threading/future/core/future.h>
-#include <util/system/thread.h>
-#include <unordered_map>
-
-namespace NKikimr::NMiniKQL {
-
-//Dummy synchronous in-memory spiller
-class TDummySpiller: public ISpiller{
-public:
- TDummySpiller()
- : NextKey(0)
- {}
-
- NThreading::TFuture<TKey> Put(TRope&& blob) override {
- auto promise = NThreading::NewPromise<ISpiller::TKey>();
-// TThread t([this, blob = std::move(blob), p = std::move(p)]() {
-// WriteAsync(blob, p);
-// });
-// t.Detach();
-// return f;
- auto key = NextKey;
- Storage[key] = blob;
- NextKey++;
- promise.SetValue(key);
- return promise.GetFuture();;
- }
- std::optional<NThreading::TFuture<TRope>> Get(TKey key) override {
- auto promise = NThreading::NewPromise<TRope>();
- if (auto it = Storage.find(key); it != Storage.end()) {
- promise.SetValue(it->second);
- return promise.GetFuture();;
- } else {
- return std::nullopt;
- }
- }
- std::optional<NThreading::TFuture<TRope>> Extract(TKey key) override {
- auto promise = NThreading::NewPromise<TRope>();
- if (auto it = Storage.find(key); it != Storage.end()) {
- promise.SetValue(std::move(it->second));
- Storage.erase(it);
- return promise.GetFuture();;
- } else {
- return std::nullopt;
- }
- }
- NThreading::TFuture<void> Delete(TKey key) override {
- auto promise = NThreading::NewPromise<void>();
- promise.SetValue();
- Storage.erase(key);
- return promise.GetFuture();
- }
-private:
- ISpiller::TKey NextKey;
- std::unordered_map<ISpiller::TKey, TRope> Storage;
-};
-ISpiller::TPtr MakeSpiller() {
- return std::make_shared<TDummySpiller>();
-}
-
-} //namespace NKikimr::NMiniKQL
diff --git a/ydb/library/yql/minikql/computation/mkql_spiller.h b/ydb/library/yql/minikql/computation/mkql_spiller.h
deleted file mode 100644
index d9663d5985..0000000000
--- a/ydb/library/yql/minikql/computation/mkql_spiller.h
+++ /dev/null
@@ -1,27 +0,0 @@
-#pragma once
-#include <library/cpp/threading/future/core/future.h>
-#include <library/cpp/actors/util/rope.h>
-
-
-namespace NKikimr::NMiniKQL {
-
-struct ISpiller {
- using TPtr = std::shared_ptr<ISpiller>;
- virtual ~ISpiller(){}
- using TKey = ui64;
- virtual NThreading::TFuture<TKey> Put(TRope&& blob) = 0;
-
- ///\return
- /// nullopt for absent keys
- /// TFuture
- virtual std::optional<NThreading::TFuture<TRope>> Get(TKey key) = 0;
- virtual NThreading::TFuture<void> Delete(TKey) = 0;
- ///Get + Delete
- ///Stored value may be moved to feature
- virtual std::optional<NThreading::TFuture<TRope>> Extract(TKey key) = 0;
-
-};
-
-ISpiller::TPtr MakeSpiller();
-
-}//namespace NKikimr::NMiniKQL
diff --git a/ydb/library/yql/minikql/computation/mkql_spiller_adapter.h b/ydb/library/yql/minikql/computation/mkql_spiller_adapter.h
deleted file mode 100644
index b3bba486bd..0000000000
--- a/ydb/library/yql/minikql/computation/mkql_spiller_adapter.h
+++ /dev/null
@@ -1,88 +0,0 @@
-#pragma once
-#include "mkql_spiller.h"
-#include <ydb/library/yql/minikql/computation/mkql_computation_node_pack.h>
-
-
-namespace NKikimr::NMiniKQL {
-
-///Stores and loads very long sequences of TMultiType UVs
-///Can split sequences into chunks
-///Sends chunks to ISplitter and keeps assigned keys
-///When all data is written switches to read mode. Switching back to writing mode is not supported
-///Provides an interface for sequential read (like forward iterator)
-///When interaction with ISpiller is required, Write and Read operations return a Future
-class TWideUnboxedValuesSpillerAdapter {
-public:
- TWideUnboxedValuesSpillerAdapter(ISpiller::TPtr spiller, const TMultiType* type, size_t sizeLimit)
- : Spiller(spiller)
- , ItemType(type)
- , SizeLimit(sizeLimit)
- , Packer(type)
- {
- }
-
- /// Write wide UV item
- /// \returns
- /// - nullopt, if thee values are accumulated
- /// - TFeature, if the values are being stored asynchronously and a caller must wait until async operation ends
- /// In this case a caller must wait operation completion and call StoreCompleted.
- /// Design note: not using Subscribe on a Future here to avoid possible race condition
- std::optional<NThreading::TFuture<ISpiller::TKey>> WriteWideItem(const TArrayRef<NUdf::TUnboxedValuePod>& wideItem) {
- Packer.AddWideItem(wideItem.data(), wideItem.size());
- if(Packer.PackedSizeEstimate() > SizeLimit) {
- return Spiller->Put(std::move(Packer.Finish()));
- } else {
- return std::nullopt;
- }
- }
-
- std::optional<NThreading::TFuture<ISpiller::TKey>> FinishWriting() {
- if (Packer.IsEmpty())
- return std::nullopt;
- return Spiller->Put(std::move(Packer.Finish()));
- }
-
- void AsyncWriteCompleted(ISpiller::TKey key) {
- StoredChunks.push_back(key);
- }
-
- //Extracting interface
- bool Empty() const {
- return StoredChunks.empty() && !CurrentBatch;
- }
- std::optional<NThreading::TFuture<TRope>> ExtractWideItem(const TArrayRef<NUdf::TUnboxedValue>& wideItem) {
- MKQL_ENSURE(!Empty(), "Internal logic error");
- if (CurrentBatch) {
- auto row = CurrentBatch->Head();
- for (size_t i = 0; i != wideItem.size(); ++i) {
- wideItem[i] = row[i];
- }
- CurrentBatch->Pop();
- if (CurrentBatch->empty()) {
- CurrentBatch = std::nullopt;
- }
- return std::nullopt;
- } else {
- auto r = Spiller->Get(StoredChunks.front());
- StoredChunks.pop_front();
- return r;
- }
- }
-
- void AsyncReadCompleted(TRope&& rope,const THolderFactory& holderFactory ) {
- //Implementation detail: deserialization is performed in a processing thread
- TUnboxedValueBatch batch(ItemType);
- Packer.UnpackBatch(std::move(rope), holderFactory, batch);
- CurrentBatch = std::move(batch);
- }
-
-private:
- ISpiller::TPtr Spiller;
- const TMultiType* const ItemType;
- const size_t SizeLimit;
- TValuePackerTransport<false> Packer;
- std::deque<ISpiller::TKey> StoredChunks;
- std::optional<TUnboxedValueBatch> CurrentBatch;
-};
-
-}//namespace NKikimr::NMiniKQL
diff --git a/ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp b/ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp
deleted file mode 100644
index 4f3953623c..0000000000
--- a/ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp
+++ /dev/null
@@ -1,73 +0,0 @@
-#include <ydb/library/yql/minikql/mkql_node.h>
-#include <ydb/library/yql/minikql/mkql_node_cast.h>
-#include <ydb/library/yql/minikql/mkql_program_builder.h>
-#include <ydb/library/yql/minikql/mkql_function_registry.h>
-#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
-#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
-#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
-#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h>
-#include <ydb/library/yql/minikql/computation/mkql_spiller_adapter.h>
-#include <ydb/library/yql/minikql/computation/mkql_spiller.h>
-
-#include <library/cpp/testing/unittest/registar.h>
-
-#include <vector>
-#include <utility>
-#include <algorithm>
-
-namespace NKikimr::NMiniKQL {
-
-Y_UNIT_TEST_SUITE(TestWideSpillerAdapter) {
- constexpr size_t itemWidth = 3;
- constexpr size_t chunkSize = 100;
- Y_UNIT_TEST(TestWriteExtractZeroItems) {
- TScopedAlloc alloc(__LOCATION__);
- TTypeEnvironment env(alloc);
- const auto spiller = MakeSpiller();
- std::vector<TType*> itemTypes(itemWidth, TDataType::Create(NUdf::TDataType<char*>::Id, env));
- TWideUnboxedValuesSpillerAdapter wideUVSpiller(spiller, TMultiType::Create(itemWidth, itemTypes.data(), env), chunkSize);
- auto r = wideUVSpiller.FinishWriting();
- UNIT_ASSERT(!r.has_value());
- UNIT_ASSERT(wideUVSpiller.Empty());
- }
-
- Y_UNIT_TEST(TestWriteExtract) {
- TScopedAlloc alloc(__LOCATION__);
- TMemoryUsageInfo memInfo("test");
- THolderFactory holderFactory(alloc.Ref(), memInfo);
- TTypeEnvironment env(alloc);
- const auto spiller = MakeSpiller();
- std::vector<TType*> itemTypes(itemWidth, TDataType::Create(NUdf::TDataType<char*>::Id, env));
- TWideUnboxedValuesSpillerAdapter wideUVSpiller(spiller, TMultiType::Create(itemWidth, itemTypes.data(), env), chunkSize);
- std::vector<NUdf::TUnboxedValue> wideValue(itemWidth);
- constexpr size_t rowCount = chunkSize*10+3;
- for (size_t row = 0; row != rowCount; ++row) {
- for(size_t i = 0; i != itemWidth; ++i) {
- wideValue[i] = NUdf::TUnboxedValuePod(NUdf::TStringValue(TStringBuilder() << "Long enough string: " << row * 10 + i));
- }
- if (auto r = wideUVSpiller.WriteWideItem(wideValue)) {
- wideUVSpiller.AsyncWriteCompleted(r->GetValue());
- }
- }
- auto r = wideUVSpiller.FinishWriting();
- if (r) {
- wideUVSpiller.AsyncWriteCompleted(r->GetValue());
- }
-
- wideUVSpiller.AsyncWriteCompleted(r->GetValue());
- for (size_t row = 0; row != rowCount; ++row) {
- UNIT_ASSERT(!wideUVSpiller.Empty());
- if (auto r = wideUVSpiller.ExtractWideItem(wideValue)) {
- wideUVSpiller.AsyncReadCompleted(r->ExtractValue(), holderFactory);
- r = wideUVSpiller.ExtractWideItem(wideValue);
- UNIT_ASSERT(!r.has_value());
- }
- for (size_t i = 0; i != itemWidth; ++i) {
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(wideValue[i].AsStringRef()) , TStringBuilder() << "Long enough string: " << row * 10 + i);
- }
- }
- UNIT_ASSERT(!wideUVSpiller.Empty());
- }
-}
-
-} //namespace NKikimr::NMiniKQL
diff --git a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-arm64.txt b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-arm64.txt
index 185dd5f550..2f9609b5a2 100644
--- a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-arm64.txt
+++ b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-arm64.txt
@@ -44,7 +44,6 @@ target_sources(minikql-computation-no_llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp
diff --git a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-x86_64.txt
index 185dd5f550..2f9609b5a2 100644
--- a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-x86_64.txt
@@ -44,7 +44,6 @@ target_sources(minikql-computation-no_llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp
diff --git a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-aarch64.txt
index 38fd1bb839..e0b561b565 100644
--- a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-aarch64.txt
@@ -45,7 +45,6 @@ target_sources(minikql-computation-no_llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp
diff --git a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-x86_64.txt b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-x86_64.txt
index 38fd1bb839..e0b561b565 100644
--- a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-x86_64.txt
@@ -45,7 +45,6 @@ target_sources(minikql-computation-no_llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp
diff --git a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.windows-x86_64.txt b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.windows-x86_64.txt
index 185dd5f550..2f9609b5a2 100644
--- a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.windows-x86_64.txt
@@ -44,7 +44,6 @@ target_sources(minikql-computation-no_llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp
diff --git a/ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-arm64.txt b/ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-arm64.txt
index 8da2d8478b..61f2fe7012 100644
--- a/ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-arm64.txt
+++ b/ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-arm64.txt
@@ -41,7 +41,6 @@ target_sources(ydb-library-yql-minikql-computation-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/presort_ut.cpp
)
set_property(
diff --git a/ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-x86_64.txt
index 440182defa..01dba3f99d 100644
--- a/ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-x86_64.txt
@@ -42,7 +42,6 @@ target_sources(ydb-library-yql-minikql-computation-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/presort_ut.cpp
)
set_property(
diff --git a/ydb/library/yql/minikql/computation/ut/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/computation/ut/CMakeLists.linux-aarch64.txt
index 653994d7c9..67eac9242a 100644
--- a/ydb/library/yql/minikql/computation/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/minikql/computation/ut/CMakeLists.linux-aarch64.txt
@@ -45,7 +45,6 @@ target_sources(ydb-library-yql-minikql-computation-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/presort_ut.cpp
)
set_property(
diff --git a/ydb/library/yql/minikql/computation/ut/CMakeLists.linux-x86_64.txt b/ydb/library/yql/minikql/computation/ut/CMakeLists.linux-x86_64.txt
index fc6f85827f..436ad6ceac 100644
--- a/ydb/library/yql/minikql/computation/ut/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/minikql/computation/ut/CMakeLists.linux-x86_64.txt
@@ -46,7 +46,6 @@ target_sources(ydb-library-yql-minikql-computation-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/presort_ut.cpp
)
set_property(
diff --git a/ydb/library/yql/minikql/computation/ut/CMakeLists.windows-x86_64.txt b/ydb/library/yql/minikql/computation/ut/CMakeLists.windows-x86_64.txt
index bfc123d31d..4244aa874b 100644
--- a/ydb/library/yql/minikql/computation/ut/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/minikql/computation/ut/CMakeLists.windows-x86_64.txt
@@ -35,7 +35,6 @@ target_sources(ydb-library-yql-minikql-computation-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/presort_ut.cpp
)
set_property(
diff --git a/ydb/library/yql/minikql/computation/ut/ya.make b/ydb/library/yql/minikql/computation/ut/ya.make
index 819a216fd6..b4c2ff659f 100644
--- a/ydb/library/yql/minikql/computation/ut/ya.make
+++ b/ydb/library/yql/minikql/computation/ut/ya.make
@@ -21,7 +21,6 @@ SRCS(
mkql_computation_pattern_cache_ut.cpp
mkql_validate_ut.cpp
mkql_value_builder_ut.cpp
- mkql_spiller_adapter_ut.cpp
presort_ut.cpp
)
diff --git a/ydb/library/yql/minikql/computation/ya.make.inc b/ydb/library/yql/minikql/computation/ya.make.inc
index 11e1687462..424a78ebdd 100644
--- a/ydb/library/yql/minikql/computation/ya.make.inc
+++ b/ydb/library/yql/minikql/computation/ya.make.inc
@@ -13,7 +13,6 @@ SRCS(
mkql_computation_node_impl.cpp
mkql_computation_node_pack.cpp
mkql_computation_node_pack_impl.cpp
- mkql_spiller.cpp
mkql_custom_list.cpp
mkql_llvm_base.cpp
mkql_validate.cpp