summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFilitov Mikhail <[email protected]>2024-07-17 16:37:15 +0200
committerGitHub <[email protected]>2024-07-17 16:37:15 +0200
commitfda767cc834942f37334c5bfe5c7d92e48e4c80a (patch)
treec8680a7a02a79633ee6845bbe5f0cbb2db636feb
parent02f915054b369d554aea8df6d318184a7834f40e (diff)
get rid of wide fields in wide combiner. Not used in llvm (#6536)
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp95
1 files changed, 55 insertions, 40 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
index c28e8dee937..ff20307d634 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
@@ -132,6 +132,24 @@ struct TCombinerNodes {
}
}
+ void ExtractValues(TComputationContext& ctx, NUdf::TUnboxedValue** from, NUdf::TUnboxedValue* to) const {
+ for (ui32 i = 0U; i < ItemNodes.size(); ++i) {
+ if (from[i]) {
+ to[i] = std::move(*(from[i]));
+ }
+ }
+ }
+
+ void ExtractValues(TComputationContext& ctx, NUdf::TUnboxedValue* from, NUdf::TUnboxedValue** to) const {
+ for (size_t i = 0, j = 0; i != ItemNodes.size(); ++i) {
+ if (IsInputItemNodeUsed(i)) {
+ *to[i] = std::move(from[j++]);
+ } else {
+ to[i] = nullptr;
+ }
+ }
+ }
+
void ProcessItem(TComputationContext& ctx, NUdf::TUnboxedValue* keys, NUdf::TUnboxedValue* state) const {
if (keys) {
std::fill_n(keys, KeyResultNodes.size(), NUdf::TUnboxedValuePod());
@@ -346,16 +364,16 @@ public:
enum class ETasteResult: i8 {
Init = -1,
Update,
- Skip
+ ConsumeRawData,
+ ExtractRawData
};
TSpillingSupportState(
- TMemoryUsageInfo* memInfo, size_t wideFieldsIndex,
+ TMemoryUsageInfo* memInfo,
const TMultiType* usedInputItemType, const TMultiType* keyAndStateType, ui32 keyWidth, size_t itemNodesSize,
const THashFunc& hash, const TEqualsFunc& equal, bool allowSpilling, TComputationContext& ctx
)
: TBase(memInfo)
, InMemoryProcessingState(memInfo, keyWidth, keyAndStateType->GetElementsCount() - keyWidth, hash, equal)
- , WideFieldsIndex(wideFieldsIndex)
, UsedInputItemType(usedInputItemType)
, KeyAndStateType(keyAndStateType)
, KeyWidth(keyWidth)
@@ -380,7 +398,7 @@ public:
bool IsProcessingRequired() const {
if (InputStatus != EFetchResult::Finish) return true;
- return HasDataForProcessing;
+ return HasRawDataToExtract || HasDataForProcessing;
}
bool UpdateAndWait() {
@@ -424,10 +442,19 @@ public:
return isNew ? ETasteResult::Init : ETasteResult::Update;
}
if (GetMode() == EOperatingMode::ProcessSpilled) {
+ if (HasRawDataToExtract) {
+ // Tongue not used here.
+ Throat = BufferForUsedInputItems.data();
+ HasRawDataToExtract = false;
+ HasDataForProcessing = true;
+ return ETasteResult::ExtractRawData;
+ }
+ HasDataForProcessing = false;
// while restoration we process buckets one by one starting from the first in a queue
bool isNew = SpilledBuckets.front().InMemoryProcessingState->TasteIt();
Throat = SpilledBuckets.front().InMemoryProcessingState->Throat;
Tongue = SpilledBuckets.front().InMemoryProcessingState->Tongue;
+ BufferForUsedInputItems.resize(0);
return isNew ? ETasteResult::Init : ETasteResult::Update;
}
@@ -445,9 +472,13 @@ public:
// Corresponding bucket is spilled, we don't need a key anymore, full input will be spilled
BufferForKeyAndState.resize(0);
- TryToSpillRawData(bucket, bucketId);
+ // Prepare space for raw data
+ MKQL_ENSURE(BufferForUsedInputItems.size() == 0, "Internal logic error");
+ BufferForUsedInputItems.resize(ItemNodesSize);
+ BufferForUsedInputItemsBucketId = bucketId;
+ Throat = BufferForUsedInputItems.data();
- return ETasteResult::Skip;
+ return ETasteResult::ConsumeRawData;
}
NUdf::TUnboxedValuePod* Extract() {
@@ -472,25 +503,6 @@ private:
BufferForKeyAndState.resize(0);
}
- // Copies data from WideFields to local and tries to spill it using suitable bucket.
- // if the bucket is already busy, then the buffer will wait for the next iteration.
- void TryToSpillRawData(TSpilledBucket& bucket, size_t bucketId) {
- auto **fields = Ctx.WideFields.data() + WideFieldsIndex;
- MKQL_ENSURE(BufferForUsedInputItems.empty(), "Internal logic error");
-
- for (size_t i = 0; i < ItemNodesSize; ++i) {
- if (fields[i]) {
- BufferForUsedInputItems.push_back(*fields[i]);
- }
- }
- if (bucket.AsyncWriteOperation.has_value()) {
- BufferForUsedInputItemsBucketId = bucketId;
- return;
- }
- bucket.AsyncWriteOperation = bucket.SpilledData->WriteWideItem(BufferForUsedInputItems);
- BufferForUsedInputItems.resize(0);
- }
-
bool FlushSpillingBuffersAndWait() {
UpdateSpillingBuckets();
@@ -620,8 +632,14 @@ private:
}
AsyncReadOperation = std::nullopt;
}
+
auto& bucket = SpilledBuckets.front();
if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory) return false;
+ if (HasDataForProcessing) {
+ Tongue = bucket.InMemoryProcessingState->Tongue;
+ Throat = bucket.InMemoryProcessingState->Throat;
+ return false;
+ }
//recover spilled state
while(!bucket.SpilledState->Empty()) {
RecoverState = true;
@@ -651,17 +669,11 @@ private:
if (AsyncReadOperation) {
return true;
}
- auto **fields = Ctx.WideFields.data() + WideFieldsIndex;
- for (size_t i = 0, j = 0; i < ItemNodesSize; ++i) {
- if (fields[i]) {
- fields[i] = &(BufferForUsedInputItems[j++]);
- }
- }
Tongue = bucket.InMemoryProcessingState->Tongue;
Throat = bucket.InMemoryProcessingState->Throat;
- HasDataForProcessing = true;
+ HasRawDataToExtract = true;
return false;
}
bucket.BucketState = TSpilledBucket::EBucketState::InMemory;
@@ -725,8 +737,9 @@ private:
bool HasDataForProcessing = false;
+ bool HasRawDataToExtract = false;
+
TState InMemoryProcessingState;
- const size_t WideFieldsIndex;
const TMultiType* const UsedInputItemType;
const TMultiType* const KeyAndStateType;
const size_t KeyWidth;
@@ -1237,6 +1250,7 @@ public:
, AllowSpilling(allowSpilling)
{}
+ // MARK: DoCAlculate
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
if (!state.HasValue()) {
MakeState(ctx, state);
@@ -1246,14 +1260,12 @@ public:
auto **fields = ctx.WideFields.data() + WideFieldsIndex;
while (true) {
- for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i)
- fields[i] = Nodes.GetUsedInputItemNodePtrOrNull(ctx, i);
-
if (ptr->UpdateAndWait()) {
return EFetchResult::Yield;
}
-
if (ptr->InputStatus != EFetchResult::Finish) {
+ for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i)
+ fields[i] = Nodes.GetUsedInputItemNodePtrOrNull(ctx, i);
switch (ptr->InputStatus = Flow->FetchValues(ctx, fields)) {
case EFetchResult::One:
break;
@@ -1274,7 +1286,11 @@ public:
case TSpillingSupportState::ETasteResult::Update:
Nodes.ProcessItem(ctx, static_cast<NUdf::TUnboxedValue*>(ptr->Tongue), static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
break;
- case TSpillingSupportState::ETasteResult::Skip:
+ case TSpillingSupportState::ETasteResult::ConsumeRawData:
+ Nodes.ExtractValues(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
+ break;
+ case TSpillingSupportState::ETasteResult::ExtractRawData:
+ Nodes.ExtractValues(ctx, static_cast<NUdf::TUnboxedValue*>(ptr->Throat), fields);
break;
}
continue;
@@ -1553,8 +1569,7 @@ public:
#endif
private:
void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
- state = ctx.HolderFactory.Create<TSpillingSupportState>(WideFieldsIndex,
- UsedInputItemType, KeyAndStateType,
+ state = ctx.HolderFactory.Create<TSpillingSupportState>(UsedInputItemType, KeyAndStateType,
Nodes.KeyNodes.size(),
Nodes.ItemNodes.size(),
#ifdef MKQL_DISABLE_CODEGEN