aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorzverevgeny <zverevgeny@ydb.tech>2023-11-23 21:41:31 +0300
committerzverevgeny <zverevgeny@ydb.tech>2023-11-23 22:10:37 +0300
commitf330971f637115368558cea161aaae7a51a11924 (patch)
treec7af80ec1495072e61156827e3dff89773c44818
parent7e0b5a3f515f85f3b50d33b17de6b9acf8cee790 (diff)
downloadydb-f330971f637115368558cea161aaae7a51a11924.tar.gz
YQL-16986 wide combiner with spilling prototype
-rw-r--r--ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp7
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_collect.cpp5
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp464
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp54
-rw-r--r--ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-arm64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_pack.h3
-rw-r--r--ydb/library/yql/minikql/computation/mkql_spiller.cpp61
-rw-r--r--ydb/library/yql/minikql/computation/mkql_spiller.h27
-rw-r--r--ydb/library/yql/minikql/computation/mkql_spiller_adapter.h88
-rw-r--r--ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp73
-rw-r--r--ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-arm64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/no_llvm/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-arm64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/ut/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/ut/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/ut/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/computation/ut/ya.make1
-rw-r--r--ydb/library/yql/minikql/computation/ya.make.inc1
26 files changed, 735 insertions, 64 deletions
diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
index e7c3b4372d..db011ef730 100644
--- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
+++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
@@ -162,8 +162,13 @@ private:
LOG_T("Task runner. Inject watermark " << watermark);
TaskRunner->SetWatermarkIn(watermark);
}
-
res = TaskRunner->Run();
+// Uncomment me to test YQL-16986 prototype
+// Delete me after YQL-16988
+// if (ERunStatus::PendingInput == res){
+// //very poor man waiting for spiller async operation completion
+// Schedule(TDuration::MilliSeconds(1), new TEvContinueRun(THashSet<ui32>(ev->Get()->InputChannels), ev->Get()->MemLimit));
+// }
}
for (auto& channelId : inputMap) {
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_collect.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_collect.cpp
index 2a4e0bb1bc..0710fb8ed2 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_collect.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_collect.cpp
@@ -20,8 +20,9 @@ public:
if (item.IsFinish()) {
return list.Release();
}
- MKQL_ENSURE(!item.IsYield(), "Unexpected flow status!");
- list = ctx.HolderFactory.Append(list.Release(), item.Release());
+ if (!item.IsYield()) {
+ list = ctx.HolderFactory.Append(list.Release(), item.Release());
+ }
}
}
#ifndef MKQL_DISABLE_CODEGEN
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
index 73c9077fb4..d54cf0cb34 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
@@ -7,6 +7,8 @@
#include <ydb/library/yql/minikql/mkql_node_cast.h>
#include <ydb/library/yql/minikql/mkql_stats_registry.h>
#include <ydb/library/yql/minikql/defs.h>
+#include <ydb/library/yql/minikql/computation/mkql_spiller.h>
+#include <ydb/library/yql/minikql/computation/mkql_spiller_adapter.h>
#include <ydb/library/yql/utils/cast.h>
#include <util/string/cast.h>
@@ -106,6 +108,16 @@ struct TCombinerNodes {
std::transform(itemsOnResults.cbegin(), itemsOnResults.cend(), PasstroughtItems.begin(), [](const TPasstroughtMap::value_type& v) { return v.has_value(); });
}
+ bool IsInputItemNodeUsed(size_t i) const {
+ return (ItemNodes[i]->GetDependencesCount() > 0U || PasstroughtItems[i]);
+ }
+
+ NUdf::TUnboxedValue* GetUsedInputItemNodePtrOrNull(TComputationContext& ctx, size_t i) const {
+ return IsInputItemNodeUsed(i) ?
+ &ItemNodes[i]->RefValue(ctx) :
+ nullptr;
+ }
+
void ExtractKey(TComputationContext& ctx, NUdf::TUnboxedValue** values, NUdf::TUnboxedValue* keys) const {
std::for_each(ItemNodes.cbegin(), ItemNodes.cend(), [&](IComputationExternalNode* item) {
if (const auto pointer = *values++)
@@ -295,6 +307,367 @@ private:
TStates States;
};
+class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
+ typedef TComputationValue<TSpillingSupportState> TBase;
+public:
+ enum class EOperatingMode {
+ InMemory, // try to perform all processing in memory
+ SpillState, //after switching to the spilling mode we spill collected state and free allocated memory
+ SpillData, //store incoming data
+ ProcessSpilled //restore and process spilled data
+ };
+ TSpillingSupportState(
+ TMemoryUsageInfo* memInfo,
+ const TCombinerNodes& nodes, IComputationWideFlowNode *const flow, size_t wideFieldsIndex,
+ const TMultiType* usedInputItemType, const TMultiType* keyAndStateType, ui32 keyWidth,
+ const THashFunc& hash, const TEqualsFunc& equal
+ )
+ : TBase(memInfo)
+ , InMemoryProcessingState(memInfo, keyWidth, keyAndStateType->GetElementsCount() - keyWidth, hash, equal)
+ , Nodes(nodes)
+ , Flow(flow)
+ , WideFieldsIndex(wideFieldsIndex)
+ , UsedInputItemType(usedInputItemType)
+ , KeyAndStateType(keyAndStateType)
+ , KeyWidth(keyWidth)
+ , Hasher(hash)
+ , Mode(EOperatingMode::InMemory)
+ {
+ BufferForUsedInputItems.reserve(usedInputItemType->GetElementsCount());
+ BufferForKeyAnsState.reserve(keyAndStateType->GetElementsCount());
+ }
+ ~TSpillingSupportState() {
+ }
+
+ EFetchResult DoCalculate(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) {
+ if (HasRunningSpillingAsyncOperation()) {
+ return EFetchResult::Yield;
+ }
+ while (true) {
+ switch(GetMode()) {
+ case EOperatingMode::InMemory: {
+ auto r = DoCalculateInMemory(ctx, output);
+ if (GetMode() == TSpillingSupportState::EOperatingMode::InMemory) {
+ return r;
+ }
+ break;
+ }
+ case EOperatingMode::SpillState: {
+ SpillState();
+ if (GetMode() == EOperatingMode::SpillState) {
+ return AsyncWrite();
+ }
+ MKQL_ENSURE(GetMode() == EOperatingMode::SpillData, "Internal logic error");
+ break;
+ }
+ case EOperatingMode::SpillData: {
+ SpillData(ctx);
+ if (GetMode() == EOperatingMode::SpillData) {
+ return AsyncWrite();
+ }
+ break;
+ }
+ case EOperatingMode::ProcessSpilled: {
+ return ProcessSpilledData(ctx, output);
+ }
+ }
+ }
+ Y_UNREACHABLE();
+ }
+private:
+ EFetchResult AsyncWrite() {
+ MKQL_ENSURE(!AsyncReadOperation.has_value(), "Internal logic error");
+ MKQL_ENSURE(AsyncWriteOperation.has_value(), "Internal logic error");
+ MKQL_ENSURE(AsyncWriteOperation->HasValue(), "Internal logic error");
+ //AsyncWriteOperation.Subscribe() //TODO YQL-16988
+ return EFetchResult::Yield;
+ }
+ EFetchResult AsyncRead() {
+ MKQL_ENSURE(!AsyncWriteOperation.has_value(), "Internal logic error");
+ MKQL_ENSURE(AsyncReadOperation.has_value(), "Internal logic error");
+ MKQL_ENSURE(AsyncReadOperation->HasValue(), "Internal logic error");
+ //AsyncReadOperation.Subscribe() //TODO YQL-16988
+ return EFetchResult::Yield;
+ }
+ EFetchResult DoCalculateInMemory(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) {
+ auto **fields = ctx.WideFields.data() + WideFieldsIndex;
+ while (InputDataFetchResult != EFetchResult::Finish) {
+ for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i) {
+ fields[i] = Nodes.GetUsedInputItemNodePtrOrNull(ctx, i);
+ }
+ InputDataFetchResult = Flow->FetchValues(ctx, fields);
+ switch (InputDataFetchResult) {
+ case EFetchResult::One: {
+ Nodes.ExtractKey(ctx, fields, static_cast<NUdf::TUnboxedValue *>(InMemoryProcessingState.Tongue));
+ const bool isNew = InMemoryProcessingState.TasteIt();
+ Nodes.ProcessItem(
+ ctx,
+ isNew ? nullptr : static_cast<NUdf::TUnboxedValue *>(InMemoryProcessingState.Tongue),
+ static_cast<NUdf::TUnboxedValue *>(InMemoryProcessingState.Throat)
+ );
+ if (IsSwitchToSpillingModeCondition()) {
+ SwitchMode(EOperatingMode::SpillState);
+ return EFetchResult::Yield;
+ }
+ continue;
+ }
+ case EFetchResult::Yield:
+ return EFetchResult::Yield;
+ case EFetchResult::Finish:
+ break;
+ }
+ }
+ if (const auto values = static_cast<NUdf::TUnboxedValue*>(InMemoryProcessingState.Extract())) {
+ Nodes.FinishItem(ctx, values, output);
+ return EFetchResult::One;
+ }
+ return EFetchResult::Finish;
+ }
+
+ void SpillState() {
+ // mini State machine that:
+ //1. Spills InMemoryProcessingState
+ //2. Finilizes buckets
+ //During each phase asyn operation may be returned
+ //On next call execution is continued
+ MKQL_ENSURE(!AsyncReadOperation, "Internal logic error");
+ if (AsyncWriteOperation) {
+ MKQL_ENSURE(AsyncWriteOperation->HasValue(), "Internal logic error");
+ SpilledBuckets[CurrentAsyncOperationBucketId].InitialState->AsyncWriteCompleted(AsyncWriteOperation->ExtractValue());
+ AsyncWriteOperation = std::nullopt;
+ }
+ while (const auto keyAndState = InMemoryProcessingState.Extract()) {
+ auto hash = Hasher(keyAndState); //Hasher uses only key for hashing
+ auto bucketId = hash % SpilledBucketCount;
+ auto& bucket = SpilledBuckets[bucketId];
+ MKQL_ENSURE(bucket.InitialState, "Internal logic error");
+ if (auto chunkIsBeingStored = bucket.InitialState->WriteWideItem({keyAndState, KeyAndStateType->GetElementsCount()})) {
+ CurrentAsyncOperationBucketId = bucketId;
+ AsyncWriteOperation = chunkIsBeingStored;
+ return;
+ }
+ for (size_t i = 0; i != KeyAndStateType->GetElementsCount(); ++i) {
+ //releasing values stored in unsafe TUnboxedValue buffer
+ keyAndState[i].UnRef();
+ }
+ }
+ if (!FinalizingSpillerBuckets) {
+ FinalizingSpillerBuckets = true;
+ CurrentAsyncOperationBucketId = 0;
+ }
+ for(; CurrentAsyncOperationBucketId != SpilledBuckets.size(); ++CurrentAsyncOperationBucketId) {
+ if (auto chunkIsBeingStored = SpilledBuckets[CurrentAsyncOperationBucketId].InitialState->FinishWriting()) {
+ AsyncWriteOperation = chunkIsBeingStored;
+ return;
+ }
+ }
+ InMemoryProcessingState.IsEmpty();
+ SwitchMode(EOperatingMode::SpillData);
+ }
+
+ void SpillData(TComputationContext& ctx) {
+ if (AsyncWriteOperation) {
+ MKQL_ENSURE(AsyncWriteOperation->HasValue(), "Internal logic error");
+ SpilledBuckets[CurrentAsyncOperationBucketId].Data->AsyncWriteCompleted(AsyncWriteOperation->ExtractValue());
+ AsyncWriteOperation = std::nullopt;
+ }
+ while (!FinalizingSpillerBuckets) {
+ auto **fields = ctx.WideFields.data() + WideFieldsIndex;
+ for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i) {
+ fields[i] = Nodes.GetUsedInputItemNodePtrOrNull(ctx, i);
+ }
+ InputDataFetchResult = Flow->FetchValues(ctx, fields);
+ switch (InputDataFetchResult) {
+ case EFetchResult::One: {
+ BufferForKeyAnsState.resize(KeyWidth); //use only key part of the buffer
+ Nodes.ExtractKey(ctx, fields, BufferForKeyAnsState.data());
+ auto hash = Hasher(BufferForKeyAnsState.data());
+ BufferForKeyAnsState.resize(0); //for freeing allocated key value asap
+ auto bucketId = hash % SpilledBucketCount;
+ MKQL_ENSURE(BufferForUsedInputItems.empty(), "Internal logic error");
+ for (size_t i = 0; i != Nodes.ItemNodes.size(); ++i) {
+ if (fields[i]) {
+ BufferForUsedInputItems.push_back(*fields[i]);
+ }
+ }
+ auto &bucket = SpilledBuckets[bucketId];
+ const auto chunkIsBeingStored = bucket.Data->WriteWideItem(BufferForUsedInputItems);
+ BufferForUsedInputItems.resize(0); //for freeing allocated key value asap
+ if (chunkIsBeingStored) {
+ CurrentAsyncOperationBucketId = bucketId;
+ AsyncWriteOperation = chunkIsBeingStored;
+ return;
+ }
+ break;
+ }
+ case EFetchResult::Yield:
+ return;
+ case EFetchResult::Finish:
+ CurrentAsyncOperationBucketId = 0;
+ FinalizingSpillerBuckets = true;
+ break;
+ }
+ }
+ for(; CurrentAsyncOperationBucketId != SpilledBuckets.size(); ++CurrentAsyncOperationBucketId) {
+ auto& bucket = SpilledBuckets[CurrentAsyncOperationBucketId];
+ if (auto chunkIsBeingStored = bucket.Data->FinishWriting()) {
+ AsyncWriteOperation = chunkIsBeingStored;
+ return;
+ }
+ }
+ SwitchMode(EOperatingMode::ProcessSpilled);
+ }
+
+ EFetchResult ProcessSpilledData(TComputationContext& ctx, NUdf::TUnboxedValue*const* output){
+ if (AsyncReadOperation) {
+ MKQL_ENSURE(AsyncReadOperation->HasValue(), "Internal logic error");
+ if (RecoverState) {
+ SpilledBuckets[0].InitialState->AsyncReadCompleted(
+ AsyncReadOperation->ExtractValue(), ctx.HolderFactory);
+ } else {
+ SpilledBuckets[0].Data->AsyncReadCompleted(
+ AsyncReadOperation->ExtractValue(), ctx.HolderFactory);
+ }
+ AsyncWriteOperation = std::nullopt;
+ }
+ while(!SpilledBuckets.empty()){
+ auto& bucket = SpilledBuckets.front();
+ //recover spilled state
+ while(!bucket.InitialState->Empty()) {
+ RecoverState = true;
+ BufferForKeyAnsState.resize(KeyAndStateType->GetElementsCount());
+ AsyncReadOperation = bucket.InitialState->ExtractWideItem(BufferForKeyAnsState);
+ if (AsyncReadOperation) {
+ BufferForKeyAnsState.resize(0);
+ return AsyncRead();
+ }
+ for (size_t i = 0; i != KeyWidth; ++i) {
+ //jumping into unsafe world, refusing ownership
+ static_cast<NUdf::TUnboxedValue&>(InMemoryProcessingState.Tongue[i]) = std::move(BufferForKeyAnsState[i]);
+ }
+ auto isNew = InMemoryProcessingState.TasteIt();
+ MKQL_ENSURE(isNew, "Internal logic error");
+ for (size_t i = KeyWidth; i != KeyAndStateType->GetElementsCount(); ++i) {
+ //jumping into unsafe world, refusing ownership
+ static_cast<NUdf::TUnboxedValue&>(InMemoryProcessingState.Throat[i - KeyWidth]) = std::move(BufferForKeyAnsState[i]);
+ }
+ BufferForKeyAnsState.resize(0);
+ }
+ //process spilled data
+ while(!bucket.Data->Empty()) {
+ RecoverState = false;
+ BufferForUsedInputItems.resize(UsedInputItemType->GetElementsCount());
+ AsyncReadOperation = bucket.Data->ExtractWideItem(BufferForUsedInputItems);
+ if (AsyncReadOperation) {
+ return AsyncRead();
+ }
+ auto **fields = ctx.WideFields.data() + WideFieldsIndex;
+ for (size_t i = 0, j = 0; i != Nodes.ItemNodes.size(); ++i) {
+ if (Nodes.IsInputItemNodeUsed(i)) {
+ fields[i] = &BufferForUsedInputItems[j++];
+ } else {
+ fields[i] = nullptr;
+ }
+ }
+ Nodes.ExtractKey(ctx, fields, static_cast<NUdf::TUnboxedValue *>(InMemoryProcessingState.Tongue));
+ const bool isNew = InMemoryProcessingState.TasteIt();
+ Nodes.ProcessItem(
+ ctx,
+ isNew ? nullptr : static_cast<NUdf::TUnboxedValue *>(InMemoryProcessingState.Tongue),
+ static_cast<NUdf::TUnboxedValue *>(InMemoryProcessingState.Throat)
+ );
+ BufferForKeyAnsState.resize(0);
+ }
+ if (const auto values = static_cast<NUdf::TUnboxedValue*>(InMemoryProcessingState.Extract())) {
+ Nodes.FinishItem(ctx, values, output);
+ return EFetchResult::One;
+ }
+ InMemoryProcessingState.IsEmpty();
+ SpilledBuckets.pop_front();
+ }
+ return EFetchResult::Finish;
+ }
+
+ EOperatingMode GetMode() const {
+ return Mode;
+ }
+
+ bool HasRunningSpillingAsyncOperation() const {
+ return
+ (AsyncWriteOperation.has_value() && !AsyncWriteOperation->HasValue()) ||
+ (AsyncReadOperation.has_value() && !AsyncReadOperation->HasValue());
+ }
+
+ void SwitchMode(EOperatingMode mode) {
+ MKQL_ENSURE(!AsyncReadOperation, "Internal logic error");
+ MKQL_ENSURE(!AsyncWriteOperation, "Internal logic error");
+ switch(mode) {
+ case EOperatingMode::InMemory:
+ MKQL_ENSURE(false, "Internal logic error");
+ break;
+ case EOperatingMode::SpillState: {
+ MKQL_ENSURE(EOperatingMode::InMemory == Mode, "Internal logic error");
+ MKQL_ENSURE(!Spiller,"Internal logic error");
+ Spiller = MakeSpiller();
+ SpilledBuckets.resize(SpilledBucketCount);
+ for (auto &b: SpilledBuckets) {
+ b.InitialState = std::make_unique<TWideUnboxedValuesSpillerAdapter>(Spiller, KeyAndStateType, 1 << 20);
+ }
+ FinalizingSpillerBuckets = false;
+ break;
+ }
+ case EOperatingMode::SpillData:
+ MKQL_ENSURE(EOperatingMode::SpillState == Mode, "Internal logic error");
+ MKQL_ENSURE(Spiller,"Internal logic error");
+ MKQL_ENSURE(SpilledBuckets.size() == SpilledBucketCount, "Internal logic error");
+ for (auto &b: SpilledBuckets) {
+ b.Data = std::make_unique<TWideUnboxedValuesSpillerAdapter>(Spiller, UsedInputItemType, 1 << 20);
+ }
+ InputDataFetchResult = EFetchResult::Yield;
+ FinalizingSpillerBuckets = false;
+ break;
+ case EOperatingMode::ProcessSpilled:
+ MKQL_ENSURE(EOperatingMode::SpillData == Mode, "Internal logic error");
+ MKQL_ENSURE(Spiller,"Internal logic error");
+ MKQL_ENSURE(SpilledBuckets.size() == SpilledBucketCount, "Internal logic error");
+ break;
+ }
+ Mode = mode;
+ }
+
+ bool IsSwitchToSpillingModeCondition() const {
+ //TODO implement me
+ return false;
+ }
+
+private:
+ TState InMemoryProcessingState;
+ const TCombinerNodes& Nodes;
+ IComputationWideFlowNode* const Flow;
+ const size_t WideFieldsIndex;
+ const TMultiType* const UsedInputItemType;
+ const TMultiType* const KeyAndStateType;
+ const size_t KeyWidth;
+ THashFunc const Hasher;
+ EOperatingMode Mode;
+ bool FinalizingSpillerBuckets; // sub mode for SpillState and SpillData
+ bool RecoverState; //sub mode for ProcessSpilledData
+
+ struct TSpilledBucket {
+ std::unique_ptr<TWideUnboxedValuesSpillerAdapter> InitialState; //state collected before switching to spilling mode
+ std::unique_ptr<TWideUnboxedValuesSpillerAdapter> Data; //data collected in spilling mode
+ };
+ static constexpr size_t SpilledBucketCount = 4;
+ std::deque<TSpilledBucket> SpilledBuckets;
+ ISpiller::TPtr Spiller;
+ EFetchResult InputDataFetchResult;
+ size_t CurrentAsyncOperationBucketId;
+ std::optional<NThreading::TFuture<ISpiller::TKey>> AsyncWriteOperation;
+ std::optional<NThreading::TFuture<TRope>> AsyncReadOperation;
+ TUnboxedValueVector BufferForUsedInputItems;
+ TUnboxedValueVector BufferForKeyAnsState;
+};
+
#ifndef MKQL_DISABLE_CODEGEN
class TLLVMFieldsStructureState: public TLLVMFieldsStructure<TComputationValue<TState>> {
private:
@@ -737,48 +1110,31 @@ class TWideLastCombinerWrapper: public TStatefulWideFlowCodegeneratorNode<TWideL
{
using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWrapper>;
public:
- TWideLastCombinerWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, TCombinerNodes&& nodes, TKeyTypes&& keyTypes)
+ TWideLastCombinerWrapper(
+ TComputationMutables& mutables,
+ IComputationWideFlowNode* flow,
+ TCombinerNodes&& nodes,
+ const TMultiType* usedInputItemType,
+ TKeyTypes&& keyTypes,
+ const TMultiType* keyAndStateType
+ )
: TBaseComputation(mutables, flow, EValueRepresentation::Boxed)
, Flow(flow)
, Nodes(std::move(nodes))
, KeyTypes(std::move(keyTypes))
+ , UsedInputItemType(usedInputItemType)
+ , KeyAndStateType(keyAndStateType)
, WideFieldsIndex(mutables.IncrementWideFieldsIndex(Nodes.ItemNodes.size()))
{}
- EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
- if (!state.HasValue()) {
- MakeState(ctx, state);
- }
-
- if (const auto ptr = static_cast<TState*>(state.AsBoxed().Get())) {
- auto **fields = ctx.WideFields.data() + WideFieldsIndex;
-
- while (EFetchResult::Finish != ptr->InputStatus) {
- for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i)
- if (Nodes.ItemNodes[i]->GetDependencesCount() > 0U || Nodes.PasstroughtItems[i])
- fields[i] = &Nodes.ItemNodes[i]->RefValue(ctx);
-
- switch (ptr->InputStatus = Flow->FetchValues(ctx, fields)) {
- case EFetchResult::One:
- Nodes.ExtractKey(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Tongue));
- Nodes.ProcessItem(ctx, ptr->TasteIt() ? nullptr : static_cast<NUdf::TUnboxedValue*>(ptr->Tongue), static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
- continue;
- case EFetchResult::Yield:
- return EFetchResult::Yield;
- case EFetchResult::Finish:
- break;
- }
- }
-
- if (const auto values = static_cast<NUdf::TUnboxedValue*>(ptr->Extract())) {
- Nodes.FinishItem(ctx, values, output);
- return EFetchResult::One;
- }
-
- return EFetchResult::Finish;
+ EFetchResult DoCalculate(NUdf::TUnboxedValue& stateValue, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
+ if (!stateValue.HasValue()) {
+ MakeSpillingSupportState(ctx, stateValue);
}
- Y_UNREACHABLE();
+ auto *const state = static_cast<TSpillingSupportState *>(stateValue.AsBoxed().Get());
+ return state->DoCalculate(ctx, output);
}
+
#ifndef MKQL_DISABLE_CODEGEN
ICodegeneratorInlineWideNode::TGenerateResult DoGenGetValues(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
auto& context = ctx.Codegen.GetContext();
@@ -1018,6 +1374,14 @@ private:
ctx.ExecuteLLVM && Equals ? TEqualsFunc(std::ptr_fun(Equals)) : TEqualsFunc(TMyValueEqual(KeyTypes))
);
#endif
+ }
+ void MakeSpillingSupportState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
+ state = ctx.HolderFactory.Create<TSpillingSupportState>(Nodes, Flow, WideFieldsIndex,
+ UsedInputItemType, KeyAndStateType,
+ Nodes.KeyNodes.size(),
+ TMyValueHasher(KeyTypes),
+ TMyValueEqual(KeyTypes)
+ );
}
void RegisterDependencies() const final {
@@ -1032,7 +1396,8 @@ private:
IComputationWideFlowNode *const Flow;
const TCombinerNodes Nodes;
const TKeyTypes KeyTypes;
-
+ const TMultiType* const UsedInputItemType;
+ const TMultiType* const KeyAndStateType;
const ui32 WideFieldsIndex;
#ifndef MKQL_DISABLE_CODEGEN
@@ -1070,8 +1435,8 @@ private:
template<bool Last>
IComputationNode* WrapWideCombinerT(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() >= (Last ? 3U : 4U), "Expected more arguments.");
-
- const auto inputWidth = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType()));
+ const auto inputType = AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType());
+ const auto inputWidth = GetWideComponentsCount(inputType);
const auto outputWidth = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetType()->GetReturnType()));
const auto flow = LocateNode(ctx.NodeLocator, callable, 0U);
@@ -1082,10 +1447,12 @@ IComputationNode* WrapWideCombinerT(TCallable& callable, const TComputationNodeF
const auto stateSize = AS_VALUE(TDataLiteral, callable.GetInput(++index))->AsValue().Get<ui32>();
++index += inputWidth;
-
+ std::vector<TType*> keyAndStateItemTypes;
+ keyAndStateItemTypes.reserve(keysSize + stateSize);
TKeyTypes keyTypes;
keyTypes.reserve(keysSize);
for (ui32 i = index; i < index + keysSize; ++i) {
+ keyAndStateItemTypes.push_back(callable.GetInput(i).GetStaticType());
bool optional;
keyTypes.emplace_back(*UnpackOptionalData(callable.GetInput(i).GetStaticType(), optional)->GetDataSlot(), optional);
}
@@ -1096,7 +1463,10 @@ IComputationNode* WrapWideCombinerT(TCallable& callable, const TComputationNodeF
index += keysSize;
nodes.InitResultNodes.reserve(stateSize);
- std::generate_n(std::back_inserter(nodes.InitResultNodes), stateSize, [&](){ return LocateNode(ctx.NodeLocator, callable, index++); } );
+ for (size_t i = 0; i != stateSize; ++i) {
+ keyAndStateItemTypes.push_back(callable.GetInput(index).GetStaticType());
+ nodes.InitResultNodes.push_back(LocateNode(ctx.NodeLocator, callable, index++));
+ }
index += stateSize;
nodes.UpdateResultNodes.reserve(stateSize);
@@ -1125,9 +1495,21 @@ IComputationNode* WrapWideCombinerT(TCallable& callable, const TComputationNodeF
nodes.BuildMaps();
if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) {
- if constexpr (Last)
- return new TWideLastCombinerWrapper(ctx.Mutables, wide, std::move(nodes), std::move(keyTypes));
- else {
+ if constexpr (Last) {
+ const auto inputItemTypes = GetWideComponents(inputType);
+ std::vector<TType*> usedInputItemTypes;
+ usedInputItemTypes.reserve(inputItemTypes.size());
+ for(size_t i = 0; i != inputItemTypes.size(); ++i) {
+ if (nodes.IsInputItemNodeUsed(i)) {
+ usedInputItemTypes.push_back(inputItemTypes[i]);
+ }
+ }
+ return new TWideLastCombinerWrapper(ctx.Mutables, wide, std::move(nodes),
+ TMultiType::Create(usedInputItemTypes.size(), usedInputItemTypes.data(), ctx.Env),
+ std::move(keyTypes),
+ TMultiType::Create(keyAndStateItemTypes.size(), keyAndStateItemTypes.data(), ctx.Env)
+ );
+ } else {
const auto memLimit = AS_VALUE(TDataLiteral, callable.GetInput(1U))->AsValue().Get<ui64>();
if (EGraphPerProcess::Single == ctx.GraphPerProcess)
return new TWideCombinerWrapper<true>(ctx.Mutables, wide, std::move(nodes), std::move(keyTypes), memLimit);
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp
index 8e49947c0d..78dbb12569 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp
@@ -923,14 +923,18 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
const auto graph = setup.BuildGraph(pgmReturn);
const auto iterator = graph->GetValue().GetListIterator();
NUdf::TUnboxedValue item;
- UNIT_ASSERT(iterator.Next(item));
- UNBOXED_VALUE_STR_EQUAL(item, "key one");
- UNIT_ASSERT(iterator.Next(item));
- UNBOXED_VALUE_STR_EQUAL(item, "very long value 2 / key two");
- UNIT_ASSERT(iterator.Next(item));
- UNBOXED_VALUE_STR_EQUAL(item, "very long key one");
- UNIT_ASSERT(iterator.Next(item));
- UNBOXED_VALUE_STR_EQUAL(item, "very long value 8 / very long value 7 / very long value 6");
+ THashSet<TString> expectedResults = {
+ "key one",
+ "very long value 2 / key two",
+ "very long key one",
+ "very long value 8 / very long value 7 / very long value 6"
+ };
+ while(!expectedResults.empty()) {
+ UNIT_ASSERT(iterator.Next(item));
+ TString val{item.AsStringRef()};
+ UNIT_ASSERT(expectedResults.contains(val));
+ expectedResults.erase(val);
+ }
UNIT_ASSERT(!iterator.Next(item));
UNIT_ASSERT(!iterator.Next(item));
}
@@ -998,14 +1002,18 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
const auto graph = setup.BuildGraph(pgmReturn);
const auto iterator = graph->GetValue().GetListIterator();
NUdf::TUnboxedValue item;
- UNIT_ASSERT(iterator.Next(item));
- UNBOXED_VALUE_STR_EQUAL(item, "very long value 1 / key one / very long value 1 / key one");
- UNIT_ASSERT(iterator.Next(item));
- UNBOXED_VALUE_STR_EQUAL(item, "very long value 3 / key two / very long value 2 / key two");
- UNIT_ASSERT(iterator.Next(item));
- UNBOXED_VALUE_STR_EQUAL(item, "very long value 4 / very long key one / very long value 4 / very long key one");
- UNIT_ASSERT(iterator.Next(item));
- UNBOXED_VALUE_STR_EQUAL(item, "very long value 9 / very long key two / very long value 5 / very long key two");
+ THashSet<TString> expectedResults = {
+ "very long value 1 / key one / very long value 1 / key one",
+ "very long value 3 / key two / very long value 2 / key two",
+ "very long value 4 / very long key one / very long value 4 / very long key one",
+ "very long value 9 / very long key two / very long value 5 / very long key two"
+ };
+ while (!expectedResults.empty()) {
+ UNIT_ASSERT(iterator.Next(item));
+ TString val{item.AsStringRef()};
+ UNIT_ASSERT(expectedResults.contains(val));
+ expectedResults.erase(val);
+ }
UNIT_ASSERT(!iterator.Next(item));
UNIT_ASSERT(!iterator.Next(item));
}
@@ -1070,11 +1078,17 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
const auto graph = setup.BuildGraph(pgmReturn);
const auto iterator = graph->GetValue().GetListIterator();
+ THashSet<TString> expectedResults = {
+ "key one / value 2 / value 1 / value 5 / value 4",
+ "key two / value 4 / value 3 / value 3 / value 2",
+ };
NUdf::TUnboxedValue item;
- UNIT_ASSERT(iterator.Next(item));
- UNBOXED_VALUE_STR_EQUAL(item, "key one / value 2 / value 1 / value 5 / value 4");
- UNIT_ASSERT(iterator.Next(item));
- UNBOXED_VALUE_STR_EQUAL(item, "key two / value 4 / value 3 / value 3 / value 2");
+ while (!expectedResults.empty()) {
+ UNIT_ASSERT(iterator.Next(item));
+ TString val{item.AsStringRef()};
+ UNIT_ASSERT(expectedResults.contains(val));
+ expectedResults.erase(val);
+ }
UNIT_ASSERT(!iterator.Next(item));
UNIT_ASSERT(!iterator.Next(item));
}
diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-arm64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-arm64.txt
index 364f602ce5..344f00fd5f 100644
--- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-arm64.txt
+++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-arm64.txt
@@ -48,6 +48,7 @@ target_sources(minikql-computation-llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp
diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt
index 364f602ce5..344f00fd5f 100644
--- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt
@@ -48,6 +48,7 @@ target_sources(minikql-computation-llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp
diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt
index 2c5ea976b0..f0a0459bfc 100644
--- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt
@@ -49,6 +49,7 @@ target_sources(minikql-computation-llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp
diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt
index 2c5ea976b0..f0a0459bfc 100644
--- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt
@@ -49,6 +49,7 @@ target_sources(minikql-computation-llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp
diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt
index 364f602ce5..344f00fd5f 100644
--- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt
@@ -48,6 +48,7 @@ target_sources(minikql-computation-llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h
index f1948212bd..d8dee524b2 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h
@@ -85,6 +85,9 @@ public:
size_t PackedSizeEstimate() const {
return IsBlock_ ? BlockBuffer_.size() : (Buffer_ ? (Buffer_->Size() + Buffer_->ReservedHeaderSize()) : 0);
}
+ bool IsEmpty() const {
+ return !ItemCount_;
+ }
void Clear();
TRope Finish();
diff --git a/ydb/library/yql/minikql/computation/mkql_spiller.cpp b/ydb/library/yql/minikql/computation/mkql_spiller.cpp
new file mode 100644
index 0000000000..803dac9c56
--- /dev/null
+++ b/ydb/library/yql/minikql/computation/mkql_spiller.cpp
@@ -0,0 +1,61 @@
+#include "mkql_spiller.h"
+#include <library/cpp/threading/future/core/future.h>
+#include <util/system/thread.h>
+#include <unordered_map>
+
+namespace NKikimr::NMiniKQL {
+
+//Dummy synchronous in-memory spiller
+class TDummySpiller: public ISpiller{
+public:
+ TDummySpiller()
+ : NextKey(0)
+ {}
+
+ NThreading::TFuture<TKey> Put(TRope&& blob) override {
+ auto promise = NThreading::NewPromise<ISpiller::TKey>();
+// TThread t([this, blob = std::move(blob), p = std::move(p)]() {
+// WriteAsync(blob, p);
+// });
+// t.Detach();
+// return f;
+ auto key = NextKey;
+ Storage[key] = blob;
+ NextKey++;
+ promise.SetValue(key);
+ return promise.GetFuture();;
+ }
+ std::optional<NThreading::TFuture<TRope>> Get(TKey key) override {
+ auto promise = NThreading::NewPromise<TRope>();
+ if (auto it = Storage.find(key); it != Storage.end()) {
+ promise.SetValue(it->second);
+ return promise.GetFuture();;
+ } else {
+ return std::nullopt;
+ }
+ }
+ std::optional<NThreading::TFuture<TRope>> Extract(TKey key) override {
+ auto promise = NThreading::NewPromise<TRope>();
+ if (auto it = Storage.find(key); it != Storage.end()) {
+ promise.SetValue(std::move(it->second));
+ Storage.erase(it);
+ return promise.GetFuture();;
+ } else {
+ return std::nullopt;
+ }
+ }
+ NThreading::TFuture<void> Delete(TKey key) override {
+ auto promise = NThreading::NewPromise<void>();
+ promise.SetValue();
+ Storage.erase(key);
+ return promise.GetFuture();
+ }
+private:
+ ISpiller::TKey NextKey;
+ std::unordered_map<ISpiller::TKey, TRope> Storage;
+};
+ISpiller::TPtr MakeSpiller() {
+ return std::make_shared<TDummySpiller>();
+}
+
+} //namespace NKikimr::NMiniKQL
diff --git a/ydb/library/yql/minikql/computation/mkql_spiller.h b/ydb/library/yql/minikql/computation/mkql_spiller.h
new file mode 100644
index 0000000000..d9663d5985
--- /dev/null
+++ b/ydb/library/yql/minikql/computation/mkql_spiller.h
@@ -0,0 +1,27 @@
+#pragma once
+#include <library/cpp/threading/future/core/future.h>
+#include <library/cpp/actors/util/rope.h>
+
+
+namespace NKikimr::NMiniKQL {
+
+struct ISpiller {
+ using TPtr = std::shared_ptr<ISpiller>;
+ virtual ~ISpiller(){}
+ using TKey = ui64;
+ virtual NThreading::TFuture<TKey> Put(TRope&& blob) = 0;
+
+ ///\return
+ /// nullopt for absent keys
+ /// TFuture
+ virtual std::optional<NThreading::TFuture<TRope>> Get(TKey key) = 0;
+ virtual NThreading::TFuture<void> Delete(TKey) = 0;
+ ///Get + Delete
+ ///Stored value may be moved to feature
+ virtual std::optional<NThreading::TFuture<TRope>> Extract(TKey key) = 0;
+
+};
+
+ISpiller::TPtr MakeSpiller();
+
+}//namespace NKikimr::NMiniKQL
diff --git a/ydb/library/yql/minikql/computation/mkql_spiller_adapter.h b/ydb/library/yql/minikql/computation/mkql_spiller_adapter.h
new file mode 100644
index 0000000000..b3bba486bd
--- /dev/null
+++ b/ydb/library/yql/minikql/computation/mkql_spiller_adapter.h
@@ -0,0 +1,88 @@
+#pragma once
+#include "mkql_spiller.h"
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_pack.h>
+
+
+namespace NKikimr::NMiniKQL {
+
+///Stores and loads very long sequences of TMultiType UVs
+///Can split sequences into chunks
+///Sends chunks to ISplitter and keeps assigned keys
+///When all data is written switches to read mode. Switching back to writing mode is not supported
+///Provides an interface for sequential read (like forward iterator)
+///When interaction with ISpiller is required, Write and Read operations return a Future
+class TWideUnboxedValuesSpillerAdapter {
+public:
+ TWideUnboxedValuesSpillerAdapter(ISpiller::TPtr spiller, const TMultiType* type, size_t sizeLimit)
+ : Spiller(spiller)
+ , ItemType(type)
+ , SizeLimit(sizeLimit)
+ , Packer(type)
+ {
+ }
+
+ /// Write wide UV item
+ /// \returns
+ /// - nullopt, if thee values are accumulated
+ /// - TFeature, if the values are being stored asynchronously and a caller must wait until async operation ends
+ /// In this case a caller must wait operation completion and call StoreCompleted.
+ /// Design note: not using Subscribe on a Future here to avoid possible race condition
+ std::optional<NThreading::TFuture<ISpiller::TKey>> WriteWideItem(const TArrayRef<NUdf::TUnboxedValuePod>& wideItem) {
+ Packer.AddWideItem(wideItem.data(), wideItem.size());
+ if(Packer.PackedSizeEstimate() > SizeLimit) {
+ return Spiller->Put(std::move(Packer.Finish()));
+ } else {
+ return std::nullopt;
+ }
+ }
+
+ std::optional<NThreading::TFuture<ISpiller::TKey>> FinishWriting() {
+ if (Packer.IsEmpty())
+ return std::nullopt;
+ return Spiller->Put(std::move(Packer.Finish()));
+ }
+
+ void AsyncWriteCompleted(ISpiller::TKey key) {
+ StoredChunks.push_back(key);
+ }
+
+ //Extracting interface
+ bool Empty() const {
+ return StoredChunks.empty() && !CurrentBatch;
+ }
+ std::optional<NThreading::TFuture<TRope>> ExtractWideItem(const TArrayRef<NUdf::TUnboxedValue>& wideItem) {
+ MKQL_ENSURE(!Empty(), "Internal logic error");
+ if (CurrentBatch) {
+ auto row = CurrentBatch->Head();
+ for (size_t i = 0; i != wideItem.size(); ++i) {
+ wideItem[i] = row[i];
+ }
+ CurrentBatch->Pop();
+ if (CurrentBatch->empty()) {
+ CurrentBatch = std::nullopt;
+ }
+ return std::nullopt;
+ } else {
+ auto r = Spiller->Get(StoredChunks.front());
+ StoredChunks.pop_front();
+ return r;
+ }
+ }
+
+ void AsyncReadCompleted(TRope&& rope,const THolderFactory& holderFactory ) {
+ //Implementation detail: deserialization is performed in a processing thread
+ TUnboxedValueBatch batch(ItemType);
+ Packer.UnpackBatch(std::move(rope), holderFactory, batch);
+ CurrentBatch = std::move(batch);
+ }
+
+private:
+ ISpiller::TPtr Spiller;
+ const TMultiType* const ItemType;
+ const size_t SizeLimit;
+ TValuePackerTransport<false> Packer;
+ std::deque<ISpiller::TKey> StoredChunks;
+ std::optional<TUnboxedValueBatch> CurrentBatch;
+};
+
+}//namespace NKikimr::NMiniKQL
diff --git a/ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp b/ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp
new file mode 100644
index 0000000000..4f3953623c
--- /dev/null
+++ b/ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp
@@ -0,0 +1,73 @@
+#include <ydb/library/yql/minikql/mkql_node.h>
+#include <ydb/library/yql/minikql/mkql_node_cast.h>
+#include <ydb/library/yql/minikql/mkql_program_builder.h>
+#include <ydb/library/yql/minikql/mkql_function_registry.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
+#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
+#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h>
+#include <ydb/library/yql/minikql/computation/mkql_spiller_adapter.h>
+#include <ydb/library/yql/minikql/computation/mkql_spiller.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <vector>
+#include <utility>
+#include <algorithm>
+
+namespace NKikimr::NMiniKQL {
+
+Y_UNIT_TEST_SUITE(TestWideSpillerAdapter) {
+ constexpr size_t itemWidth = 3;
+ constexpr size_t chunkSize = 100;
+ Y_UNIT_TEST(TestWriteExtractZeroItems) {
+ TScopedAlloc alloc(__LOCATION__);
+ TTypeEnvironment env(alloc);
+ const auto spiller = MakeSpiller();
+ std::vector<TType*> itemTypes(itemWidth, TDataType::Create(NUdf::TDataType<char*>::Id, env));
+ TWideUnboxedValuesSpillerAdapter wideUVSpiller(spiller, TMultiType::Create(itemWidth, itemTypes.data(), env), chunkSize);
+ auto r = wideUVSpiller.FinishWriting();
+ UNIT_ASSERT(!r.has_value());
+ UNIT_ASSERT(wideUVSpiller.Empty());
+ }
+
+ Y_UNIT_TEST(TestWriteExtract) {
+ TScopedAlloc alloc(__LOCATION__);
+ TMemoryUsageInfo memInfo("test");
+ THolderFactory holderFactory(alloc.Ref(), memInfo);
+ TTypeEnvironment env(alloc);
+ const auto spiller = MakeSpiller();
+ std::vector<TType*> itemTypes(itemWidth, TDataType::Create(NUdf::TDataType<char*>::Id, env));
+ TWideUnboxedValuesSpillerAdapter wideUVSpiller(spiller, TMultiType::Create(itemWidth, itemTypes.data(), env), chunkSize);
+ std::vector<NUdf::TUnboxedValue> wideValue(itemWidth);
+ constexpr size_t rowCount = chunkSize*10+3;
+ for (size_t row = 0; row != rowCount; ++row) {
+ for(size_t i = 0; i != itemWidth; ++i) {
+ wideValue[i] = NUdf::TUnboxedValuePod(NUdf::TStringValue(TStringBuilder() << "Long enough string: " << row * 10 + i));
+ }
+ if (auto r = wideUVSpiller.WriteWideItem(wideValue)) {
+ wideUVSpiller.AsyncWriteCompleted(r->GetValue());
+ }
+ }
+ auto r = wideUVSpiller.FinishWriting();
+ if (r) {
+ wideUVSpiller.AsyncWriteCompleted(r->GetValue());
+ }
+
+ wideUVSpiller.AsyncWriteCompleted(r->GetValue());
+ for (size_t row = 0; row != rowCount; ++row) {
+ UNIT_ASSERT(!wideUVSpiller.Empty());
+ if (auto r = wideUVSpiller.ExtractWideItem(wideValue)) {
+ wideUVSpiller.AsyncReadCompleted(r->ExtractValue(), holderFactory);
+ r = wideUVSpiller.ExtractWideItem(wideValue);
+ UNIT_ASSERT(!r.has_value());
+ }
+ for (size_t i = 0; i != itemWidth; ++i) {
+ UNIT_ASSERT_VALUES_EQUAL(TStringBuf(wideValue[i].AsStringRef()) , TStringBuilder() << "Long enough string: " << row * 10 + i);
+ }
+ }
+ UNIT_ASSERT(!wideUVSpiller.Empty());
+ }
+}
+
+} //namespace NKikimr::NMiniKQL
diff --git a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-arm64.txt b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-arm64.txt
index 2f9609b5a2..185dd5f550 100644
--- a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-arm64.txt
+++ b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-arm64.txt
@@ -44,6 +44,7 @@ target_sources(minikql-computation-no_llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp
diff --git a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-x86_64.txt
index 2f9609b5a2..185dd5f550 100644
--- a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-x86_64.txt
@@ -44,6 +44,7 @@ target_sources(minikql-computation-no_llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp
diff --git a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-aarch64.txt
index e0b561b565..38fd1bb839 100644
--- a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-aarch64.txt
@@ -45,6 +45,7 @@ target_sources(minikql-computation-no_llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp
diff --git a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-x86_64.txt b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-x86_64.txt
index e0b561b565..38fd1bb839 100644
--- a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-x86_64.txt
@@ -45,6 +45,7 @@ target_sources(minikql-computation-no_llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp
diff --git a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.windows-x86_64.txt b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.windows-x86_64.txt
index 2f9609b5a2..185dd5f550 100644
--- a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.windows-x86_64.txt
@@ -44,6 +44,7 @@ target_sources(minikql-computation-no_llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp
diff --git a/ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-arm64.txt b/ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-arm64.txt
index 61f2fe7012..8da2d8478b 100644
--- a/ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-arm64.txt
+++ b/ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-arm64.txt
@@ -41,6 +41,7 @@ target_sources(ydb-library-yql-minikql-computation-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/presort_ut.cpp
)
set_property(
diff --git a/ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-x86_64.txt
index 01dba3f99d..440182defa 100644
--- a/ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-x86_64.txt
@@ -42,6 +42,7 @@ target_sources(ydb-library-yql-minikql-computation-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/presort_ut.cpp
)
set_property(
diff --git a/ydb/library/yql/minikql/computation/ut/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/computation/ut/CMakeLists.linux-aarch64.txt
index 67eac9242a..653994d7c9 100644
--- a/ydb/library/yql/minikql/computation/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/minikql/computation/ut/CMakeLists.linux-aarch64.txt
@@ -45,6 +45,7 @@ target_sources(ydb-library-yql-minikql-computation-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/presort_ut.cpp
)
set_property(
diff --git a/ydb/library/yql/minikql/computation/ut/CMakeLists.linux-x86_64.txt b/ydb/library/yql/minikql/computation/ut/CMakeLists.linux-x86_64.txt
index 436ad6ceac..fc6f85827f 100644
--- a/ydb/library/yql/minikql/computation/ut/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/minikql/computation/ut/CMakeLists.linux-x86_64.txt
@@ -46,6 +46,7 @@ target_sources(ydb-library-yql-minikql-computation-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/presort_ut.cpp
)
set_property(
diff --git a/ydb/library/yql/minikql/computation/ut/CMakeLists.windows-x86_64.txt b/ydb/library/yql/minikql/computation/ut/CMakeLists.windows-x86_64.txt
index 4244aa874b..bfc123d31d 100644
--- a/ydb/library/yql/minikql/computation/ut/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/minikql/computation/ut/CMakeLists.windows-x86_64.txt
@@ -35,6 +35,7 @@ target_sources(ydb-library-yql-minikql-computation-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/presort_ut.cpp
)
set_property(
diff --git a/ydb/library/yql/minikql/computation/ut/ya.make b/ydb/library/yql/minikql/computation/ut/ya.make
index b4c2ff659f..819a216fd6 100644
--- a/ydb/library/yql/minikql/computation/ut/ya.make
+++ b/ydb/library/yql/minikql/computation/ut/ya.make
@@ -21,6 +21,7 @@ SRCS(
mkql_computation_pattern_cache_ut.cpp
mkql_validate_ut.cpp
mkql_value_builder_ut.cpp
+ mkql_spiller_adapter_ut.cpp
presort_ut.cpp
)
diff --git a/ydb/library/yql/minikql/computation/ya.make.inc b/ydb/library/yql/minikql/computation/ya.make.inc
index 424a78ebdd..11e1687462 100644
--- a/ydb/library/yql/minikql/computation/ya.make.inc
+++ b/ydb/library/yql/minikql/computation/ya.make.inc
@@ -13,6 +13,7 @@ SRCS(
mkql_computation_node_impl.cpp
mkql_computation_node_pack.cpp
mkql_computation_node_pack_impl.cpp
+ mkql_spiller.cpp
mkql_custom_list.cpp
mkql_llvm_base.cpp
mkql_validate.cpp