diff options
author | aneporada <aneporada@ydb.tech> | 2023-06-06 21:51:14 +0300 |
---|---|---|
committer | aneporada <aneporada@ydb.tech> | 2023-06-06 21:51:14 +0300 |
commit | d7618ef587053f2eb7d20f0b2d89709e69009c1b (patch) | |
tree | 1d4e0f418fbf355be1293e0bdd9b37f0f5d63e2e | |
parent | f18d76db543cf768f466baf6e44549a6556daec6 (diff) | |
download | ydb-d7618ef587053f2eb7d20f0b2d89709e69009c1b.tar.gz |
Support for wide streams in dq channels
49 files changed, 1048 insertions, 325 deletions
diff --git a/ydb/core/kqp/query_data/kqp_query_data.cpp b/ydb/core/kqp/query_data/kqp_query_data.cpp index 13eb649faff..a7673a46493 100644 --- a/ydb/core/kqp/query_data/kqp_query_data.cpp +++ b/ydb/core/kqp/query_data/kqp_query_data.cpp @@ -19,13 +19,19 @@ TTypedUnboxedValue TKqpExecuterTxResult::GetUV( const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& factory) { + YQL_ENSURE(!Rows.IsWide()); if (IsStream) { auto* listOfItemType = NKikimr::NMiniKQL::TListType::Create(MkqlItemType, typeEnv); - NUdf::TUnboxedValue value = factory.VectorAsArray(Rows); + + NUdf::TUnboxedValue* itemsPtr = nullptr; + auto value = factory.CreateDirectArrayHolder(Rows.RowCount(), itemsPtr); + Rows.ForEachRow([&](NUdf::TUnboxedValue& value) { + *itemsPtr++ = std::move(value); + }); return {listOfItemType, value}; } else { - YQL_ENSURE(Rows.size() == 1, "Actual buffer size: " << Rows.size()); - return {MkqlItemType, Rows[0]}; + YQL_ENSURE(Rows.RowCount() == 1, "Actual buffer size: " << Rows.RowCount()); + return {MkqlItemType, *Rows.Head()}; } } @@ -42,6 +48,7 @@ NKikimrMiniKQL::TResult TKqpExecuterTxResult::GetMkql() { } void TKqpExecuterTxResult::FillMkql(NKikimrMiniKQL::TResult* mkqlResult) { + YQL_ENSURE(!Rows.IsWide()); if (IsStream) { mkqlResult->MutableType()->SetKind(NKikimrMiniKQL::List); ExportTypeToProto( @@ -49,16 +56,15 @@ void TKqpExecuterTxResult::FillMkql(NKikimrMiniKQL::TResult* mkqlResult) { *mkqlResult->MutableType()->MutableList()->MutableItem(), ColumnOrder); - for(auto& row: Rows) { + Rows.ForEachRow([&](NUdf::TUnboxedValue& value) { ExportValueToProto( - MkqlItemType, row, *mkqlResult->MutableValue()->AddList(), + MkqlItemType, value, *mkqlResult->MutableValue()->AddList(), ColumnOrder); - } - + }); } else { - YQL_ENSURE(Rows.size() == 1, "Actual buffer size: " << Rows.size()); + YQL_ENSURE(Rows.RowCount() == 1, "Actual buffer size: " << Rows.RowCount()); ExportTypeToProto(MkqlItemType, *mkqlResult->MutableType()); - ExportValueToProto(MkqlItemType, Rows[0], *mkqlResult->MutableValue()); + ExportValueToProto(MkqlItemType, *Rows.Head(), *mkqlResult->MutableValue()); } } diff --git a/ydb/core/kqp/query_data/kqp_query_data.h b/ydb/core/kqp/query_data/kqp_query_data.h index 519961e0ee1..437b3036a42 100644 --- a/ydb/core/kqp/query_data/kqp_query_data.h +++ b/ydb/core/kqp/query_data/kqp_query_data.h @@ -80,7 +80,7 @@ struct TKqpExecuterTxResult { NKikimr::NMiniKQL::TType* MkqlItemType; const TVector<ui32>* ColumnOrder = nullptr; ui32 QueryResultIndex = 0; - NKikimr::NMiniKQL::TUnboxedValueVector Rows; + NKikimr::NMiniKQL::TUnboxedValueBatch Rows; explicit TKqpExecuterTxResult( bool isStream, diff --git a/ydb/core/kqp/runtime/kqp_effects.cpp b/ydb/core/kqp/runtime/kqp_effects.cpp index 35ca7b017e1..36758e44459 100644 --- a/ydb/core/kqp/runtime/kqp_effects.cpp +++ b/ydb/core/kqp/runtime/kqp_effects.cpp @@ -23,6 +23,12 @@ public: value.Apply(*ApplyCtx); } + void WideConsume(NUdf::TUnboxedValue* values, ui32 count) final { + Y_UNUSED(values); + Y_UNUSED(count); + Y_FAIL("WideConsume not supported yet"); + } + void Consume(NDqProto::TCheckpoint&&) final { Y_FAIL("Shouldn't be called"); } diff --git a/ydb/core/kqp/runtime/kqp_output_stream.cpp b/ydb/core/kqp/runtime/kqp_output_stream.cpp index b931881f785..431db53178c 100644 --- a/ydb/core/kqp/runtime/kqp_output_stream.cpp +++ b/ydb/core/kqp/runtime/kqp_output_stream.cpp @@ -45,6 +45,12 @@ public: Outputs[partitionIndex]->Push(std::move(value)); } + void WideConsume(TUnboxedValue* values, ui32 count) final { + Y_UNUSED(values); + Y_UNUSED(count); + Y_FAIL("WideConsume not supported yet"); + } + void Consume(NDqProto::TCheckpoint&&) final { Y_FAIL("Shouldn't be called"); } diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 5d791c4656d..617b4588032 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -1086,7 +1086,7 @@ public: } i64 GetAsyncInputData( - NKikimr::NMiniKQL::TUnboxedValueVector& resultVector, + NKikimr::NMiniKQL::TUnboxedValueBatch& resultBatch, TMaybe<TInstant>&, bool& finished, i64 freeSpace) override @@ -1097,6 +1097,8 @@ public: return 0; } + YQL_ENSURE(!resultBatch.IsWide(), "Wide stream is not supported"); + CA_LOG_D(TStringBuilder() << " enter getasyncinputdata results size " << Results.size()); ui64 bytes = 0; while (!Results.empty()) { @@ -1120,7 +1122,7 @@ public: for (; result.ProcessedRows < result.PackedRows; ++result.ProcessedRows) { NMiniKQL::TBytesStatistics rowSize = GetRowSize((*batch)[result.ProcessedRows].GetElements()); - resultVector.push_back(std::move((*batch)[result.ProcessedRows])); + resultBatch.push_back(std::move((*batch)[result.ProcessedRows])); ProcessedRowCount += 1; bytes += rowSize.AllocatedBytes; if (ProcessedRowCount == Settings.GetItemsLimit()) { @@ -1129,7 +1131,7 @@ public: return bytes; } } - CA_LOG_D(TStringBuilder() << "returned " << resultVector.size() << " rows; processed " << ProcessedRowCount << " rows"); + CA_LOG_D(TStringBuilder() << "returned " << resultBatch.RowCount() << " rows; processed " << ProcessedRowCount << " rows"); size_t rowCount = result.ReadResult.Get()->Get()->GetRowsCount(); if (rowCount == result.ProcessedRows) { diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index f7106c30d8e..9d00649423b 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -203,9 +203,11 @@ private: TActorBootstrapped<TKqpStreamLookupActor>::PassAway(); } - i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& batch, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final { + i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final { i64 totalDataSize = 0; + YQL_ENSURE(!batch.IsWide(), "Wide stream is not supported"); + totalDataSize = PackResults(batch, freeSpace); auto status = FetchLookupKeys(); @@ -366,17 +368,11 @@ private: } } - ui64 PackResults(NKikimr::NMiniKQL::TUnboxedValueVector& batch, i64 freeSpace) { + ui64 PackResults(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) { i64 totalSize = 0; bool sizeLimitExceeded = false; batch.clear(); - size_t rowsCount = 0; - for (const auto& result : Results) { - rowsCount += result.ReadResult->Get()->GetRowsCount(); - } - batch.reserve(rowsCount); - while (!Results.empty() && !sizeLimitExceeded) { auto& result = Results.front(); for (; result.UnprocessedResultRow < result.ReadResult->Get()->GetRowsCount(); ++result.UnprocessedResultRow) { diff --git a/ydb/core/kqp/runtime/kqp_transport.cpp b/ydb/core/kqp/runtime/kqp_transport.cpp index 6ee6d5163e8..3feb75e88fb 100644 --- a/ydb/core/kqp/runtime/kqp_transport.cpp +++ b/ydb/core/kqp/runtime/kqp_transport.cpp @@ -75,11 +75,11 @@ Ydb::ResultSet TKqpProtoBuilder::BuildYdbResultSet( NDq::TDqDataSerializer dataSerializer(*TypeEnv, *HolderFactory, transportVersion); for (auto& part : data) { if (part.GetRows()) { - TUnboxedValueVector rows; + TUnboxedValueBatch rows(mkqlSrcRowType); dataSerializer.Deserialize(part, mkqlSrcRowType, rows); - for (auto& row : rows) { - ExportValueToProto(mkqlSrcRowType, row, *resultSet.add_rows(), columnOrder); - } + rows.ForEachRow([&](const NUdf::TUnboxedValue& value) { + ExportValueToProto(mkqlSrcRowType, value, *resultSet.add_rows(), columnOrder); + }); } } diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index 5e24b9880ea..8626a73d82f 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -291,7 +291,7 @@ private: DoExecute(); } - void AsyncInputPush(NKikimr::NMiniKQL::TUnboxedValueVector&& batch, TAsyncInputInfoBase& source, i64 space, bool finished) override { + void AsyncInputPush(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, TAsyncInputInfoBase& source, i64 space, bool finished) override { ProcessSourcesState.Inflight++; source.PushStarted = true; source.Finished = finished; @@ -693,7 +693,7 @@ private: } void SinkSend(ui64 index, - NKikimr::NMiniKQL::TUnboxedValueVector&& batch, + NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, TMaybe<NDqProto::TCheckpoint>&& checkpoint, i64 size, i64 checkpointSize, @@ -722,7 +722,7 @@ private: { auto guard = BindAllocator(); - NKikimr::NMiniKQL::TUnboxedValueVector data = std::move(batch); + NKikimr::NMiniKQL::TUnboxedValueBatch data = std::move(batch); sinkInfo.AsyncOutput->SendData(std::move(data), size, std::move(checkpoint), finished); } diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h index 901bf62167d..0c496218fb9 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h @@ -75,7 +75,7 @@ struct IDqComputeActorAsyncInput { // Method should be called under bound mkql allocator. // Could throw YQL errors. virtual i64 GetAsyncInputData( - NKikimr::NMiniKQL::TUnboxedValueVector& batch, + NKikimr::NMiniKQL::TUnboxedValueBatch& batch, TMaybe<TInstant>& watermark, bool& finished, i64 freeSpace) = 0; @@ -149,7 +149,7 @@ struct IDqComputeActorAsyncOutput { // Could throw YQL errors. // Checkpoint (if any) is supposed to be ordered after batch, // and finished flag is supposed to be ordered after checkpoint. - virtual void SendData(NKikimr::NMiniKQL::TUnboxedValueVector&& batch, i64 dataSize, + virtual void SendData(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 dataSize, const TMaybe<NDqProto::TCheckpoint>& checkpoint, bool finished) = 0; // Checkpointing. diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 1d1e406d74a..19623a925e2 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1080,7 +1080,7 @@ protected: return TaskRunner->BindAllocator(); } - virtual void AsyncInputPush(NKikimr::NMiniKQL::TUnboxedValueVector&& batch, TAsyncInputInfoBase& source, i64 space, bool finished) { + virtual void AsyncInputPush(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, TAsyncInputInfoBase& source, i64 space, bool finished) { source.Buffer->Push(std::move(batch), space); if (finished) { source.Buffer->Finish(); @@ -1420,7 +1420,7 @@ private: ui32 SendDataChunkToAsyncOutput(ui64 outputIndex, TAsyncOutputInfoBase& outputInfo, ui64 bytes) { auto sink = outputInfo.Buffer; - NKikimr::NMiniKQL::TUnboxedValueVector dataBatch; + NKikimr::NMiniKQL::TUnboxedValueBatch dataBatch(sink->GetOutputType()); NDqProto::TCheckpoint checkpoint; const ui64 dataSize = !outputInfo.Finished ? sink->Pop(dataBatch, bytes) : 0; @@ -1646,14 +1646,14 @@ protected: const i64 freeSpace = AsyncInputFreeSpace(info); if (freeSpace > 0) { TMaybe<TInstant> watermark; - NKikimr::NMiniKQL::TUnboxedValueVector batch; + NKikimr::NMiniKQL::TUnboxedValueBatch batch; Y_VERIFY(info.AsyncInput); bool finished = false; const i64 space = info.AsyncInput->GetAsyncInputData(batch, watermark, finished, freeSpace); CA_LOG_T("Poll async input " << inputIndex << ". Buffer free space: " << freeSpace << ", read from async input: " << space << " bytes, " - << batch.size() << " rows, finished: " << finished); + << batch.RowCount() << " rows, finished: " << finished); if (!batch.empty()) { // If we have read some data, we must run such reading again @@ -1662,7 +1662,7 @@ protected: ContinueExecute(); } - DqComputeActorMetrics.ReportAsyncInputData(inputIndex, batch.size(), watermark); + DqComputeActorMetrics.ReportAsyncInputData(inputIndex, batch.RowCount(), watermark); if (watermark) { const auto inputWatermarkChanged = WatermarksTracker.NotifyAsyncInputWatermarkReceived( diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor.h b/ydb/library/yql/dq/actors/task_runner/task_runner_actor.h index 0d45a9fde86..9ae31976c18 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor.h +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor.h @@ -16,7 +16,7 @@ struct ITaskRunnerActor { virtual ~ICallbacks() = default; virtual void SinkSend( ui64 index, - NKikimr::NMiniKQL::TUnboxedValueVector&& batch, + NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, TMaybe<NDqProto::TCheckpoint>&& checkpoint, i64 size, i64 checkpointSize, @@ -29,7 +29,7 @@ struct ITaskRunnerActor { virtual void AsyncInputPush( ui64 cookie, ui64 index, - NKikimr::NMiniKQL::TUnboxedValueVector&& batch, + NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space, bool finish) = 0; diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp index 45715caef0d..2638afd0fac 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp @@ -286,7 +286,7 @@ private: void AsyncInputPush( ui64 cookie, ui64 index, - NKikimr::NMiniKQL::TUnboxedValueVector&& batch, + NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space, bool finish) override { @@ -385,7 +385,7 @@ private: auto guard = TaskRunner->BindAllocator(); auto sink = TaskRunner->GetSink(ev->Get()->Index); - NKikimr::NMiniKQL::TUnboxedValueVector batch; + NKikimr::NMiniKQL::TUnboxedValueBatch batch(sink->GetOutputType()); NDqProto::TCheckpoint checkpoint; TMaybe<NDqProto::TCheckpoint> maybeCheckpoint; i64 size = 0; diff --git a/ydb/library/yql/dq/runtime/dq_async_input.cpp b/ydb/library/yql/dq/runtime/dq_async_input.cpp index 2b26d151ff7..25be6bf2eef 100644 --- a/ydb/library/yql/dq/runtime/dq_async_input.cpp +++ b/ydb/library/yql/dq/runtime/dq_async_input.cpp @@ -17,7 +17,7 @@ public: return InputIndex; } - void Push(NKikimr::NMiniKQL::TUnboxedValueVector&& batch, i64 space) override { + void Push(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space) override { Y_VERIFY(!batch.empty() || !space); if (!batch.empty()) { AddBatch(std::move(batch), space); diff --git a/ydb/library/yql/dq/runtime/dq_async_input.h b/ydb/library/yql/dq/runtime/dq_async_input.h index a73fe4bac99..45c3ef4fc39 100644 --- a/ydb/library/yql/dq/runtime/dq_async_input.h +++ b/ydb/library/yql/dq/runtime/dq_async_input.h @@ -29,7 +29,7 @@ public: virtual ui64 GetInputIndex() const = 0; - virtual void Push(NKikimr::NMiniKQL::TUnboxedValueVector&& batch, i64 space) = 0; + virtual void Push(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space) = 0; virtual void Finish() = 0; diff --git a/ydb/library/yql/dq/runtime/dq_async_output.cpp b/ydb/library/yql/dq/runtime/dq_async_output.cpp index 5c8b38054ed..e60e7ca970f 100644 --- a/ydb/library/yql/dq/runtime/dq_async_output.cpp +++ b/ydb/library/yql/dq/runtime/dq_async_output.cpp @@ -67,6 +67,12 @@ public: ReportChunkIn(); } + void WidePush(NUdf::TUnboxedValue* values, ui32 count) override { + Y_UNUSED(values); + Y_UNUSED(count); + YQL_ENSURE(false, "Wide stream is not supported"); + } + void Push(NDqProto::TWatermark&& watermark) override { const ui64 bytesSize = watermark.ByteSize(); Values.emplace_back(std::move(watermark), bytesSize); @@ -91,7 +97,7 @@ public: } } - ui64 Pop(NKikimr::NMiniKQL::TUnboxedValueVector& batch, ui64 bytes) override { + ui64 Pop(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, ui64 bytes) override { batch.clear(); ui64 valuesCount = 0; ui64 usedBytes = 0; @@ -106,7 +112,6 @@ public: } // Reserve size and return data. - batch.reserve(valuesCount); while (valuesCount--) { batch.emplace_back(std::move(std::get<NUdf::TUnboxedValue>(Values.front().Value))); Values.pop_front(); @@ -114,7 +119,7 @@ public: Y_VERIFY(EstimatedStoredBytes >= usedBytes); EstimatedStoredBytes -= usedBytes; - ReportChunkOut(batch.size(), usedBytes); + ReportChunkOut(batch.RowCount(), usedBytes); return usedBytes; } diff --git a/ydb/library/yql/dq/runtime/dq_async_output.h b/ydb/library/yql/dq/runtime/dq_async_output.h index 86e57b6e58c..c8e8eb8be72 100644 --- a/ydb/library/yql/dq/runtime/dq_async_output.h +++ b/ydb/library/yql/dq/runtime/dq_async_output.h @@ -32,7 +32,7 @@ public: // Pop data to send. Return estimated size of returned data. [[nodiscard]] - virtual ui64 Pop(NKikimr::NMiniKQL::TUnboxedValueVector& batch, ui64 bytes) = 0; + virtual ui64 Pop(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, ui64 bytes) = 0; // Pop watermark [[nodiscard]] virtual bool Pop(NDqProto::TWatermark& watermark) = 0; diff --git a/ydb/library/yql/dq/runtime/dq_columns_resolve.cpp b/ydb/library/yql/dq/runtime/dq_columns_resolve.cpp index a5b1b4babcd..81b3761004d 100644 --- a/ydb/library/yql/dq/runtime/dq_columns_resolve.cpp +++ b/ydb/library/yql/dq/runtime/dq_columns_resolve.cpp @@ -7,21 +7,29 @@ namespace NYql::NDq { using namespace NKikimr::NMiniKQL; TMaybe<TColumnInfo> FindColumnInfo(const NKikimr::NMiniKQL::TType* type, TStringBuf columnName) { - YQL_ENSURE(type->GetKind() == TType::EKind::Struct); - const auto& structType = static_cast<const TStructType&>(*type); - - auto columnIndex = structType.FindMemberIndex(columnName); - if (!columnIndex) { - return {}; + TType* memberType = nullptr; + ui32 idx; + if (type->GetKind() == TType::EKind::Multi) { + const auto& multiType = static_cast<const TMultiType&>(*type); + YQL_ENSURE(TryFromString(columnName, idx), "Expecting number as column name"); + YQL_ENSURE(idx < multiType.GetElementsCount(), "Invalid column index"); + memberType = multiType.GetElementType(idx); + } else { + YQL_ENSURE(type->GetKind() == TType::EKind::Struct); + const auto& structType = static_cast<const TStructType&>(*type); + auto columnIndex = structType.FindMemberIndex(columnName); + if (!columnIndex) { + return {}; + } + memberType = structType.GetMemberType(*columnIndex); + idx = *columnIndex; } - auto memberType = structType.GetMemberType(*columnIndex); - if (memberType->GetKind() == TType::EKind::Optional) { memberType = static_cast<TOptionalType&>(*memberType).GetItemType(); } - return TColumnInfo{TString(columnName), *columnIndex, memberType}; + return TColumnInfo{TString(columnName), idx, memberType}; } TColumnInfo GetColumnInfo(const TType* type, TStringBuf columnName) { diff --git a/ydb/library/yql/dq/runtime/dq_input.h b/ydb/library/yql/dq/runtime/dq_input.h index 4067549a82f..d027d1d03a7 100644 --- a/ydb/library/yql/dq/runtime/dq_input.h +++ b/ydb/library/yql/dq/runtime/dq_input.h @@ -31,13 +31,20 @@ public: virtual bool Empty() const = 0; [[nodiscard]] - virtual bool Pop(NKikimr::NMiniKQL::TUnboxedValueVector& batch) = 0; + virtual bool Pop(NKikimr::NMiniKQL::TUnboxedValueBatch& batch) = 0; virtual bool IsFinished() const = 0; virtual const TDqInputStats* GetStats() const = 0; virtual NKikimr::NMiniKQL::TType* GetInputType() const = 0; + inline TMaybe<ui32> GetInputWidth() const { + auto type = GetInputType(); + if (type->IsMulti()) { + return static_cast<const NKikimr::NMiniKQL::TMultiType*>(type)->GetElementsCount(); + } + return {}; + } // Checkpointing // After pause IDqInput::Pop() stops return batches that were pushed before pause diff --git a/ydb/library/yql/dq/runtime/dq_input_channel.cpp b/ydb/library/yql/dq/runtime/dq_input_channel.cpp index 3431bfdbdb2..85956f9ecae 100644 --- a/ydb/library/yql/dq/runtime/dq_input_channel.cpp +++ b/ydb/library/yql/dq/runtime/dq_input_channel.cpp @@ -13,18 +13,17 @@ private: void PushImpl(NDqProto::TData&& data) { const i64 space = data.GetRaw().size(); - NKikimr::NMiniKQL::TUnboxedValueVector buffer; - buffer.reserve(data.GetRows()); - + NKikimr::NMiniKQL::TUnboxedValueBatch batch(InputType); if (Y_UNLIKELY(ProfileStats)) { auto startTime = TInstant::Now(); - DataSerializer.Deserialize(data, InputType, buffer); + DataSerializer.Deserialize(data, InputType, batch); ProfileStats->DeserializationTime += (TInstant::Now() - startTime); } else { - DataSerializer.Deserialize(data, InputType, buffer); + DataSerializer.Deserialize(data, InputType, batch); } - AddBatch(std::move(buffer), space); + YQL_ENSURE(batch.RowCount() == data.GetRows()); + AddBatch(std::move(batch), space); } void DeserializeAllData() { @@ -73,7 +72,7 @@ public: } [[nodiscard]] - bool Pop(NKikimr::NMiniKQL::TUnboxedValueVector& batch) override { + bool Pop(NKikimr::NMiniKQL::TUnboxedValueBatch& batch) override { if (Batches.empty()) { DeserializeAllData(); } diff --git a/ydb/library/yql/dq/runtime/dq_input_impl.h b/ydb/library/yql/dq/runtime/dq_input_impl.h index dca47f85dd5..c4a672cfa7e 100644 --- a/ydb/library/yql/dq/runtime/dq_input_impl.h +++ b/ydb/library/yql/dq/runtime/dq_input_impl.h @@ -1,5 +1,8 @@ #pragma once +#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> +#include <ydb/library/yql/minikql/mkql_node.h> + namespace NYql::NDq { template <class TDerived, class IInputInterface> @@ -7,6 +10,7 @@ class TDqInputImpl : public IInputInterface { public: TDqInputImpl(NKikimr::NMiniKQL::TType* inputType, ui64 maxBufferBytes) : InputType(inputType) + , Width(inputType->IsMulti() ? static_cast<const NKikimr::NMiniKQL::TMultiType*>(inputType)->GetElementsCount() : TMaybe<ui32>{}) , MaxBufferBytes(maxBufferBytes) { } @@ -24,14 +28,16 @@ public: return Batches.empty() || (IsPaused() && GetBatchesBeforePause() == 0); } - void AddBatch(NKikimr::NMiniKQL::TUnboxedValueVector&& batch, i64 space) { + void AddBatch(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space) { + Y_VERIFY(batch.Width() == GetWidth()); + StoredBytes += space; - StoredRows += batch.size(); + StoredRows += batch.RowCount(); auto& stats = MutableBasicStats(); stats.Chunks++; stats.Bytes += space; - stats.RowsIn += batch.size(); + stats.RowsIn += batch.RowCount(); if (!stats.FirstRowTs) { stats.FirstRowTs = TInstant::Now(); } @@ -44,7 +50,8 @@ public: } [[nodiscard]] - bool Pop(NKikimr::NMiniKQL::TUnboxedValueVector& batch) override { + bool Pop(NKikimr::NMiniKQL::TUnboxedValueBatch& batch) override { + Y_VERIFY(batch.Width() == GetWidth()); if (Empty()) { return false; } @@ -55,12 +62,22 @@ public: ui64 batchesCount = GetBatchesBeforePause(); Y_VERIFY(batchesCount <= Batches.size()); - batch.reserve(StoredRowsBeforePause); - - while (batchesCount--) { - auto& part = Batches.front(); - std::move(part.begin(), part.end(), std::back_inserter(batch)); - Batches.pop_front(); + if (batch.IsWide()) { + while (batchesCount--) { + auto& part = Batches.front(); + part.ForEachRowWide([&batch](NUdf::TUnboxedValue* values, ui32 width) { + batch.PushRow(values, width); + }); + Batches.pop_front(); + } + } else { + while (batchesCount--) { + auto& part = Batches.front(); + part.ForEachRow([&batch](NUdf::TUnboxedValue& value) { + batch.emplace_back(std::move(value)); + }); + Batches.pop_front(); + } } BatchesBeforePause = PauseMask; @@ -70,10 +87,18 @@ public: StoredBytesBeforePause = 0; StoredRowsBeforePause = 0; } else { - batch.reserve(StoredRows); - - for (auto&& part : Batches) { - std::move(part.begin(), part.end(), std::back_inserter(batch)); + if (batch.IsWide()) { + for (auto&& part : Batches) { + part.ForEachRowWide([&batch](NUdf::TUnboxedValue* values, ui32 width) { + batch.PushRow(values, width); + }); + } + } else { + for (auto&& part : Batches) { + part.ForEachRow([&batch](NUdf::TUnboxedValue& value) { + batch.emplace_back(std::move(value)); + }); + } } StoredBytes = 0; @@ -81,7 +106,7 @@ public: Batches.clear(); } - MutableBasicStats().RowsOut += batch.size(); + MutableBasicStats().RowsOut += batch.RowCount(); return true; } @@ -128,10 +153,15 @@ protected: return BatchesBeforePause & ~PauseMask; } + TMaybe<ui32> GetWidth() const { + return Width; + } + protected: NKikimr::NMiniKQL::TType* const InputType = nullptr; + const TMaybe<ui32> Width; const ui64 MaxBufferBytes = 0; - TList<NKikimr::NMiniKQL::TUnboxedValueVector, NKikimr::NMiniKQL::TMKQLAllocator<NUdf::TUnboxedValue>> Batches; + TList<NKikimr::NMiniKQL::TUnboxedValueBatch, NKikimr::NMiniKQL::TMKQLAllocator<NKikimr::NMiniKQL::TUnboxedValueBatch>> Batches; ui64 StoredBytes = 0; ui64 StoredRows = 0; bool Finished = false; diff --git a/ydb/library/yql/dq/runtime/dq_input_producer.cpp b/ydb/library/yql/dq/runtime/dq_input_producer.cpp index d86bcd90a1e..7860afeff2c 100644 --- a/ydb/library/yql/dq/runtime/dq_input_producer.cpp +++ b/ydb/library/yql/dq/runtime/dq_input_producer.cpp @@ -12,18 +12,21 @@ using namespace NUdf; namespace { -class TDqInputUnionStreamValue : public TComputationValue<TDqInputUnionStreamValue> { +template<bool IsWide> +class TDqInputUnionStreamValue : public TComputationValue<TDqInputUnionStreamValue<IsWide>> { + using TBase = TComputationValue<TDqInputUnionStreamValue<IsWide>>; public: TDqInputUnionStreamValue(TMemoryUsageInfo* memInfo, TVector<IDqInput::TPtr>&& inputs, TDqMeteringStats::TInputStatsMeter stats) - : TComputationValue<TDqInputUnionStreamValue>(memInfo) + : TBase(memInfo) , Inputs(std::move(inputs)) - , CurrentItemIndex(0) + , Batch(Inputs.empty() ? nullptr : Inputs.front()->GetInputType()) , Stats(stats) {} private: NUdf::EFetchStatus Fetch(NKikimr::NUdf::TUnboxedValue& result) final { - if (CurrentItemIndex >= CurrentBuffer.size()) { + MKQL_ENSURE(!IsWide, "Using Fetch() on wide input"); + if (Batch.empty()) { auto status = FindBuffer(); switch (status) { case NUdf::EFetchStatus::Ok: @@ -34,21 +37,45 @@ private: } } - result = std::move(CurrentBuffer[CurrentItemIndex]); + result = std::move(*Batch.Head()); + Batch.Pop(); + if (Stats) { Stats.Add(result); } - ++CurrentItemIndex; + return NUdf::EFetchStatus::Ok; + } + + NUdf::EFetchStatus WideFetch(NKikimr::NUdf::TUnboxedValue* result, ui32 width) final { + YQL_ENSURE(IsWide, "Using WideFetch() on narrow input"); + if (Batch.empty()) { + auto status = FindBuffer(); + switch (status) { + case NUdf::EFetchStatus::Ok: + break; + case NUdf::EFetchStatus::Finish: + case NUdf::EFetchStatus::Yield: + return status; + } + } + + MKQL_ENSURE_S(Batch.Width() == width); + NUdf::TUnboxedValue* head = Batch.Head(); + std::move(head, head + width, result); + Batch.Pop(); + + if (Stats) { + Stats.Add(result, width); + } return NUdf::EFetchStatus::Ok; } NUdf::EFetchStatus FindBuffer() { bool allFinished = true; - CurrentBuffer.clear(); + Batch.clear(); for (auto& input : Inputs) { - if (input->Pop(CurrentBuffer)) { - CurrentItemIndex = 0; + if (input->Pop(Batch)) { return NUdf::EFetchStatus::Ok; } allFinished &= input->IsFinished(); @@ -59,46 +86,47 @@ private: private: TVector<IDqInput::TPtr> Inputs; - TUnboxedValueVector CurrentBuffer; - ui64 CurrentItemIndex; + TUnboxedValueBatch Batch; TDqMeteringStats::TInputStatsMeter Stats; }; -class TDqInputMergeStreamValue : public TComputationValue<TDqInputMergeStreamValue> { +template<bool IsWide> +class TDqInputMergeStreamValue : public TComputationValue<TDqInputMergeStreamValue<IsWide>> { + using TBase = TComputationValue<TDqInputMergeStreamValue<IsWide>>; public: TDqInputMergeStreamValue(TMemoryUsageInfo* memInfo, TVector<IDqInput::TPtr>&& inputs, TVector<TSortColumnInfo>&& sortCols, TDqMeteringStats::TInputStatsMeter stats) - : TComputationValue<TDqInputMergeStreamValue>(memInfo) + : TBase(memInfo) , Inputs(std::move(inputs)) , SortCols(std::move(sortCols)) , Stats(stats) - { - CurrentBuffers.resize(Inputs.size()); - CurrentItemIndexes.reserve(Inputs.size()); - for (ui32 idx = 0; idx < Inputs.size(); ++idx) { - CurrentItemIndexes.emplace_back(TUnboxedValuesIterator(*this, Inputs[idx], CurrentBuffers[idx])); - } - for (auto& sortCol : SortCols) { - const auto typeId = sortCol.GetTypeId(); - TMaybe<EDataSlot> maybeDataSlot = FindDataSlot(typeId); - YQL_ENSURE(maybeDataSlot, "Trying to compare columns with unknown type id: " << typeId); - YQL_ENSURE(IsTypeSupportedInMergeCn(*maybeDataSlot), "Column '" << sortCol.Name << - "' has unsupported type for Merge connection: " << *maybeDataSlot); - SortColTypes[sortCol.Index] = *maybeDataSlot; - } + { + CurrentBuffers.resize(Inputs.size()); + CurrentItemIndexes.reserve(Inputs.size()); + for (ui32 idx = 0; idx < Inputs.size(); ++idx) { + CurrentItemIndexes.emplace_back(TUnboxedValuesIterator<IsWide>(*this, Inputs[idx], CurrentBuffers[idx])); } + for (auto& sortCol : SortCols) { + const auto typeId = sortCol.GetTypeId(); + TMaybe<EDataSlot> maybeDataSlot = FindDataSlot(typeId); + YQL_ENSURE(maybeDataSlot, "Trying to compare columns with unknown type id: " << typeId); + YQL_ENSURE(IsTypeSupportedInMergeCn(*maybeDataSlot), "Column '" << sortCol.Name << + "' has unsupported type for Merge connection: " << *maybeDataSlot); + SortColTypes[sortCol.Index] = *maybeDataSlot; + } + } private: + template<bool IsWideIter> class TUnboxedValuesIterator { private: - TUnboxedValueVector* Data = nullptr; + TUnboxedValueBatch* Data = nullptr; IDqInput::TPtr Input; const TDqInputMergeStreamValue* Comparator = nullptr; - ui32 CurrentIndex = 0; + ui32 Width_; public: NUdf::EFetchStatus FindBuffer() { Data->clear(); - CurrentIndex = 0; if (Input->Pop(*Data)) { return NUdf::EFetchStatus::Ok; } @@ -106,34 +134,40 @@ private: return Input->IsFinished() ? NUdf::EFetchStatus::Finish : NUdf::EFetchStatus::Yield; } - TUnboxedValuesIterator(const TDqInputMergeStreamValue& comparator, IDqInput::TPtr input, TUnboxedValueVector& data) + TUnboxedValuesIterator(const TDqInputMergeStreamValue<IsWideIter>& comparator, IDqInput::TPtr input, TUnboxedValueBatch& data) : Data(&data) , Input(input) , Comparator(&comparator) + , Width_(data.IsWide() ? *data.Width() : 1u) { - } bool IsYield() const { - return CurrentIndex == Data->size(); + return Data->empty(); } bool operator<(const TUnboxedValuesIterator& item) const { return Comparator->CompareSortCols(GetValue(), item.GetValue()) > 0; } - void operator++() { - ++CurrentIndex; - Y_VERIFY(CurrentIndex <= Data->size()); + + ui32 Width() const { + return Width_; } - NKikimr::NUdf::TUnboxedValue& GetValue() { - return (*Data)[CurrentIndex]; + + void Pop() { + Data->Pop(); + } + + NKikimr::NUdf::TUnboxedValue* GetValue() { + return Data->Head(); } - const NKikimr::NUdf::TUnboxedValue& GetValue() const { - return (*Data)[CurrentIndex]; + const NKikimr::NUdf::TUnboxedValue* GetValue() const { + return Data->Head(); } }; NUdf::EFetchStatus Fetch(NKikimr::NUdf::TUnboxedValue& result) final { + YQL_ENSURE(!IsWide, "Using Fetch() on wide input"); auto status = CheckBuffers(); switch (status) { case NUdf::EFetchStatus::Ok: @@ -143,31 +177,60 @@ private: return status; } - result = std::move(FindResult()); + CopyResult(&result, 1); if (Stats) { Stats.Add(result); } return NUdf::EFetchStatus::Ok; } - NKikimr::NUdf::TUnboxedValue FindResult() { + NUdf::EFetchStatus WideFetch(NKikimr::NUdf::TUnboxedValue* result, ui32 width) final { + YQL_ENSURE(IsWide, "Using WideFetch() on narrow input"); + auto status = CheckBuffers(); + switch (status) { + case NUdf::EFetchStatus::Ok: + break; + case NUdf::EFetchStatus::Finish: + case NUdf::EFetchStatus::Yield: + return status; + } + + YQL_ENSURE(!Inputs.empty() && *Inputs.front()->GetInputWidth() == width); + CopyResult(result, width); + if (Stats) { + Stats.Add(result, width); + } + return NUdf::EFetchStatus::Ok; + } + + + void CopyResult(NKikimr::NUdf::TUnboxedValue* dst, ui32 width) { YQL_ENSURE(CurrentItemIndexes.size()); std::pop_heap(CurrentItemIndexes.begin(), CurrentItemIndexes.end()); auto& current = CurrentItemIndexes.back(); Y_VERIFY(!current.IsYield()); - NKikimr::NUdf::TUnboxedValue res = current.GetValue(); - ++current; + YQL_ENSURE(width == current.Width()); + + NKikimr::NUdf::TUnboxedValue* res = current.GetValue(); + std::move(res, res + width, dst); + current.Pop(); if (!current.IsYield()) { std::push_heap(CurrentItemIndexes.begin(), CurrentItemIndexes.end()); } - return res; } - int CompareSortCols(const NKikimr::NUdf::TUnboxedValue& lhs, const NKikimr::NUdf::TUnboxedValue& rhs) const { + int CompareSortCols(const NKikimr::NUdf::TUnboxedValue* lhs, const NKikimr::NUdf::TUnboxedValue* rhs) const { int compRes = 0; for (auto sortCol = SortCols.begin(); sortCol != SortCols.end() && compRes == 0; ++sortCol) { - auto lhsColValue = lhs.GetElement(sortCol->Index); - auto rhsColValue = rhs.GetElement(sortCol->Index); + NKikimr::NUdf::TUnboxedValue lhsColValue; + NKikimr::NUdf::TUnboxedValue rhsColValue; + if constexpr (IsWide) { + lhsColValue = *(lhs + sortCol->Index); + rhsColValue = *(rhs + sortCol->Index); + } else { + lhsColValue = lhs->GetElement(sortCol->Index); + rhsColValue = rhs->GetElement(sortCol->Index); + } auto it = SortColTypes.find(sortCol->Index); Y_VERIFY(it != SortColTypes.end()); compRes = NKikimr::NMiniKQL::CompareValues(it->second, @@ -217,13 +280,27 @@ private: private: TVector<IDqInput::TPtr> Inputs; TVector<TSortColumnInfo> SortCols; - TVector<TUnboxedValueVector> CurrentBuffers; - TVector<TUnboxedValuesIterator> CurrentItemIndexes; + TVector<TUnboxedValueBatch> CurrentBuffers; + TVector<TUnboxedValuesIterator<IsWide>> CurrentItemIndexes; ui32 InitializationIndex = 0; TMap<ui32, EDataSlot> SortColTypes; TDqMeteringStats::TInputStatsMeter Stats; }; +bool IsWideInputs(const TVector<IDqInput::TPtr>& inputs) { + NKikimr::NMiniKQL::TType* type = nullptr; + bool isWide = false; + for (auto& input : inputs) { + if (!type) { + type = input->GetInputType(); + isWide = input->GetInputWidth().Defined(); + } else { + YQL_ENSURE(type->IsSameType(*input->GetInputType())); + } + } + return isWide; +} + } // namespace void TDqMeteringStats::TInputStatsMeter::Add(const NKikimr::NUdf::TUnboxedValue& val) { @@ -237,16 +314,42 @@ void TDqMeteringStats::TInputStatsMeter::Add(const NKikimr::NUdf::TUnboxedValue& } } +void TDqMeteringStats::TInputStatsMeter::Add(const NKikimr::NUdf::TUnboxedValue* row, ui32 width) { + Stats->RowsConsumed += 1; + if (InputType) { + YQL_ENSURE(InputType->IsMulti()); + auto multiType = static_cast<const TMultiType*>(InputType); + YQL_ENSURE(width == multiType->GetElementsCount()); + + NYql::NDq::TDqDataSerializer::TEstimateSizeSettings settings; + settings.DiscardUnsupportedTypes = true; + settings.WithHeaders = false; + + ui64 size = 0; + for (ui32 i = 0; i < multiType->GetElementsCount(); ++i) { + size += TDqDataSerializer::EstimateSize(row[i], multiType->GetElementType(i), nullptr, settings); + } + + Stats->BytesConsumed += Max<ui64>(size, 8 /* billing size for count(*) */); + } +} + NUdf::TUnboxedValue CreateInputUnionValue(TVector<IDqInput::TPtr>&& inputs, const NMiniKQL::THolderFactory& factory, TDqMeteringStats::TInputStatsMeter stats) { - return factory.Create<TDqInputUnionStreamValue>(std::move(inputs), stats); + if (IsWideInputs(inputs)) { + return factory.Create<TDqInputUnionStreamValue<true>>(std::move(inputs), stats); + } + return factory.Create<TDqInputUnionStreamValue<false>>(std::move(inputs), stats); } NKikimr::NUdf::TUnboxedValue CreateInputMergeValue(TVector<IDqInput::TPtr>&& inputs, TVector<TSortColumnInfo>&& sortCols, const NKikimr::NMiniKQL::THolderFactory& factory, TDqMeteringStats::TInputStatsMeter stats) { - return factory.Create<TDqInputMergeStreamValue>(std::move(inputs), std::move(sortCols), stats); + if (IsWideInputs(inputs)) { + return factory.Create<TDqInputMergeStreamValue<true>>(std::move(inputs), std::move(sortCols), stats); + } + return factory.Create<TDqInputMergeStreamValue<false>>(std::move(inputs), std::move(sortCols), stats); } } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/runtime/dq_input_producer.h b/ydb/library/yql/dq/runtime/dq_input_producer.h index 60628f76268..d4e96b2aebb 100644 --- a/ydb/library/yql/dq/runtime/dq_input_producer.h +++ b/ydb/library/yql/dq/runtime/dq_input_producer.h @@ -13,6 +13,7 @@ struct TDqMeteringStats { struct TInputStatsMeter { void Add(const NKikimr::NUdf::TUnboxedValue&); + void Add(const NKikimr::NUdf::TUnboxedValue* row, ui32 width); operator bool() { return Stats; } TInputStats* Stats = nullptr; diff --git a/ydb/library/yql/dq/runtime/dq_output.h b/ydb/library/yql/dq/runtime/dq_output.h index c0251410782..23dd7dac29b 100644 --- a/ydb/library/yql/dq/runtime/dq_output.h +++ b/ydb/library/yql/dq/runtime/dq_output.h @@ -43,6 +43,7 @@ public: virtual bool IsFull() const = 0; // can throw TDqChannelStorageException virtual void Push(NUdf::TUnboxedValue&& value) = 0; + virtual void WidePush(NUdf::TUnboxedValue* values, ui32 count) = 0; virtual void Push(NDqProto::TWatermark&& watermark) = 0; // Push checkpoint. Checkpoints may be pushed to channel even after it is finished. virtual void Push(NDqProto::TCheckpoint&& checkpoint) = 0; diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.cpp b/ydb/library/yql/dq/runtime/dq_output_channel.cpp index 940e899211f..266568920cd 100644 --- a/ydb/library/yql/dq/runtime/dq_output_channel.cpp +++ b/ydb/library/yql/dq/runtime/dq_output_channel.cpp @@ -55,6 +55,7 @@ public: , BasicStats(ChannelId) , ProfileStats(settings.CollectProfileStats ? &BasicStats : nullptr) , Packer(OutputType) + , Width(OutputType->IsMulti() ? static_cast<NMiniKQL::TMultiType*>(OutputType)->GetElementsCount() : 1u) , Storage(settings.ChannelStorage) , HolderFactory(holderFactory) , TransportVersion(settings.TransportVersion) @@ -81,7 +82,18 @@ public: return Storage->IsFull(); } - void Push(NUdf::TUnboxedValue&& value) override { + virtual void Push(NUdf::TUnboxedValue&& value) override { + YQL_ENSURE(!OutputType->IsMulti()); + DoPush(&value, 1); + } + + virtual void WidePush(NUdf::TUnboxedValue* values, ui32 width) override { + YQL_ENSURE(OutputType->IsMulti()); + YQL_ENSURE(Width == width); + DoPush(values, width); + } + + void DoPush(NUdf::TUnboxedValue* values, ui32 width) { TProfileGuard guard(ProfileStats ? &ProfileStats->SerializationTime : nullptr); if (!BasicStats.FirstRowIn) { BasicStats.FirstRowIn = TInstant::Now(); @@ -97,8 +109,15 @@ public: return; } - Packer.AddItem(value); - value = {}; + if (OutputType->IsMulti()) { + Packer.AddWideItem(values, width); + } else { + Packer.AddItem(*values); + } + for (ui32 i = 0; i < width; ++i) { + values[i] = {}; + } + ChunkRowCount++; BasicStats.RowsIn++; @@ -265,7 +284,7 @@ public: ChunkRowCount = 0; } - NKikimr::NMiniKQL::TUnboxedValueVector rows; + NKikimr::NMiniKQL::TUnboxedValueBatch rows(OutputType); for (;;) { NDqProto::TData chunk; if (!this->Pop(chunk)) { @@ -275,11 +294,17 @@ public: Packer.UnpackBatch(buf, HolderFactory, rows); } - for (auto& row : rows) { - Packer.AddItem(row); + if (OutputType->IsMulti()) { + rows.ForEachRowWide([this](const auto* values, ui32 width) { + Packer.AddWideItem(values, width); + }); + } else { + rows.ForEachRow([this](const auto& value) { + Packer.AddItem(value); + }); } - data.SetRows(rows.size()); + data.SetRows(rows.RowCount()); auto buffer = Packer.FinishAndPull(); data.MutableRaw()->reserve(buffer.Size()); buffer.CopyTo(*data.MutableRaw()); @@ -330,6 +355,7 @@ private: TDqOutputChannelStats BasicStats; TDqOutputChannelStats* ProfileStats = nullptr; NKikimr::NMiniKQL::TValuePackerTransport<FastPack> Packer; + const ui32 Width; const IDqChannelStorage::TPtr Storage; const NMiniKQL::THolderFactory& HolderFactory; const NDqProto::EDataTransportVersion TransportVersion; diff --git a/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp b/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp index 20800da3223..1d72c08ffde 100644 --- a/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp +++ b/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp @@ -26,6 +26,11 @@ void Log(TStringBuf msg) { #endif } +enum EChannelWidth { + NARROW_CHANNEL, + WIDE_CHANNEL, +}; + struct TTestContext { TScopedAlloc Alloc; TTypeEnvironment TypeEnv; @@ -33,18 +38,22 @@ struct TTestContext { THolderFactory HolderFactory; TDefaultValueBuilder Vb; NDqProto::EDataTransportVersion TransportVersion; + bool IsWide; TDqDataSerializer Ds; TStructType* OutputType = nullptr; + TMultiType* WideOutputType = nullptr; - TTestContext(NDqProto::EDataTransportVersion transportVersion = NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0, bool bigRows = false) + TTestContext(EChannelWidth width = NARROW_CHANNEL, NDqProto::EDataTransportVersion transportVersion = NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0, bool bigRows = false) : Alloc(__LOCATION__) , TypeEnv(Alloc) , MemInfo("Mem") , HolderFactory(Alloc.Ref(), MemInfo) , Vb(HolderFactory) , TransportVersion(transportVersion) + , IsWide(width == WIDE_CHANNEL) , Ds(TypeEnv, HolderFactory, TransportVersion) { + //TMultiType::Create(ui32 elementsCount, TType *const *elements, const TTypeEnvironment &env) if (bigRows) { TStructMember members[3] = { {"x", TDataType::Create(NUdf::TDataType<i32>::Id, TypeEnv)}, @@ -59,9 +68,27 @@ struct TTestContext { }; OutputType = TStructType::Create(2, members, TypeEnv); } + + TVector<TType*> components; + for (ui32 i = 0; i < OutputType->GetMembersCount(); ++i) { + components.push_back(OutputType->GetMemberType(i)); + } + WideOutputType = TMultiType::Create(components.size(), components.data(), TypeEnv); } - NUdf::TUnboxedValue CreateRow(ui32 value) { + TUnboxedValueBatch CreateRow(ui32 value) { + if (IsWide) { + TUnboxedValueBatch result(WideOutputType); + result.PushRow([&](ui32 idx) { + if (idx == 0) { + return NUdf::TUnboxedValuePod(value); + } else if (idx == 1) { + return NUdf::TUnboxedValuePod((ui64)(value * value)); + } + return NMiniKQL::MakeString("***"); + }); + return result; + } NUdf::TUnboxedValue* items; auto row = Vb.NewArray(OutputType->GetMembersCount(), items); items[0] = NUdf::TUnboxedValuePod(value); @@ -69,10 +96,24 @@ struct TTestContext { if (OutputType->GetMembersCount() == 3) { items[2] = NMiniKQL::MakeString("***"); } - return row; + TUnboxedValueBatch result(OutputType); + result.emplace_back(std::move(row)); + return result; } - NUdf::TUnboxedValue CreateBigRow(ui32 value, ui32 size) { + TUnboxedValueBatch CreateBigRow(ui32 value, ui32 size) { + if (IsWide) { + TUnboxedValueBatch result(WideOutputType); + result.PushRow([&](ui32 idx) { + if (idx == 0) { + return NUdf::TUnboxedValuePod(value); + } else if (idx == 1) { + return NUdf::TUnboxedValuePod((ui64)(value * value)); + } + return NMiniKQL::MakeString(std::string(size, '*')); + }); + return result; + } NUdf::TUnboxedValue* items; auto row = Vb.NewArray(OutputType->GetMembersCount(), items); items[0] = NUdf::TUnboxedValuePod(value); @@ -80,10 +121,57 @@ struct TTestContext { if (OutputType->GetMembersCount() == 3) { items[2] = NMiniKQL::MakeString(std::string(size, '*')); } - return row; + TUnboxedValueBatch result(OutputType); + result.emplace_back(std::move(row)); + return result; + } + + TType* GetOutputType() const { + if (IsWide) { + return WideOutputType; + } + return OutputType; + } + + ui32 Width() const { + if (IsWide) { + return WideOutputType->GetElementsCount(); + } + return 1u; } }; +void ValidateBatch(const TTestContext& ctx, const TUnboxedValueBatch& batch, ui32 startIndex, size_t expectedBatchSize) { + UNIT_ASSERT_VALUES_EQUAL(expectedBatchSize, batch.RowCount()); + ui32 i = 0; + if (ctx.IsWide) { + batch.ForEachRowWide([&](const NUdf::TUnboxedValue* values, ui32 width) { + ui32 j = i + startIndex; + UNIT_ASSERT_VALUES_EQUAL(width, ctx.Width()); + UNIT_ASSERT_VALUES_EQUAL(j, values[0].Get<i32>()); + UNIT_ASSERT_VALUES_EQUAL(j * j, values[1].Get<ui64>()); + ++i; + }); + } else { + batch.ForEachRow([&](const NUdf::TUnboxedValue& value) { + ui32 j = i + startIndex; + UNIT_ASSERT_VALUES_EQUAL(j, value.GetElement(0).Get<i32>()); + UNIT_ASSERT_VALUES_EQUAL(j * j, value.GetElement(1).Get<ui64>()); + ++i; + }); + } + UNIT_ASSERT_VALUES_EQUAL(expectedBatchSize, i); +} + +void PushRow(const TTestContext& ctx, TUnboxedValueBatch&& row, const IDqOutputChannel::TPtr& ch) { + auto* values = row.Head(); + if (ctx.IsWide) { + ch->WidePush(values, *row.Width()); + } else { + ch->Push(std::move(*values)); + } +} + void TestSingleRead(TTestContext& ctx) { TDqOutputChannelSettings settings; settings.MaxStoredBytes = 1000; @@ -91,12 +179,12 @@ void TestSingleRead(TTestContext& ctx) { settings.CollectProfileStats = true; settings.TransportVersion = ctx.TransportVersion; - auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.HolderFactory, settings, Log); + auto ch = CreateDqOutputChannel(1, ctx.GetOutputType(), ctx.HolderFactory, settings, Log); for (i32 i = 0; i < 10; ++i) { auto row = ctx.CreateRow(i); UNIT_ASSERT(!ch->IsFull()); - ch->Push(std::move(row)); + PushRow(ctx, std::move(row), ch); } UNIT_ASSERT_VALUES_EQUAL(0, ch->GetStats()->Chunks); @@ -111,15 +199,10 @@ void TestSingleRead(TTestContext& ctx) { UNIT_ASSERT_VALUES_EQUAL(10, ch->GetStats()->RowsIn); UNIT_ASSERT_VALUES_EQUAL(10, ch->GetStats()->RowsOut); - TUnboxedValueVector buffer; - ctx.Ds.Deserialize(data, ctx.OutputType, buffer); - - UNIT_ASSERT_VALUES_EQUAL(10, buffer.size()); - for (i32 i = 0; i < 10; ++i) { - UNIT_ASSERT_VALUES_EQUAL(i, buffer[i].GetElement(0).Get<i32>()); - UNIT_ASSERT_VALUES_EQUAL(i * i, buffer[i].GetElement(1).Get<ui64>()); - } + TUnboxedValueBatch buffer(ctx.GetOutputType()); + ctx.Ds.Deserialize(data, ctx.GetOutputType(), buffer); + ValidateBatch(ctx, buffer, 0, 10); data.Clear(); UNIT_ASSERT(!ch->Pop(data)); } @@ -131,12 +214,12 @@ void TestPartialRead(TTestContext& ctx) { settings.CollectProfileStats = true; settings.TransportVersion = ctx.TransportVersion; - auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.HolderFactory, settings, Log); + auto ch = CreateDqOutputChannel(1, ctx.GetOutputType(), ctx.HolderFactory, settings, Log); for (i32 i = 0; i < 9; ++i) { auto row = ctx.CreateRow(i); UNIT_ASSERT(!ch->IsFull()); - ch->Push(std::move(row)); + PushRow(ctx, std::move(row), ch); } UNIT_ASSERT_VALUES_EQUAL(0, ch->GetStats()->Chunks); @@ -160,16 +243,9 @@ void TestPartialRead(TTestContext& ctx) { UNIT_ASSERT_VALUES_EQUAL(9, ch->GetStats()->RowsIn); UNIT_ASSERT_VALUES_EQUAL(readRows + data.GetRows(), ch->GetStats()->RowsOut); - TUnboxedValueVector buffer; - ctx.Ds.Deserialize(data, ctx.OutputType, buffer); - - UNIT_ASSERT_VALUES_EQUAL(data.GetRows(), buffer.size()); - for (ui32 i = 0; i < data.GetRows(); ++i) { - ui32 j = readRows + i; - UNIT_ASSERT_VALUES_EQUAL(j, buffer[i].GetElement(0).Get<i32>()); - UNIT_ASSERT_VALUES_EQUAL(j * j, buffer[i].GetElement(1).Get<ui64>()); - } - + TUnboxedValueBatch buffer(ctx.GetOutputType()); + ctx.Ds.Deserialize(data, ctx.GetOutputType(), buffer); + ValidateBatch(ctx, buffer, readRows, data.GetRows()); readRows += data.GetRows(); } @@ -184,12 +260,12 @@ void TestOverflow(TTestContext& ctx) { settings.CollectProfileStats = true; settings.TransportVersion = ctx.TransportVersion; - auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.HolderFactory, settings, Log); + auto ch = CreateDqOutputChannel(1, ctx.GetOutputType(), ctx.HolderFactory, settings, Log); for (i32 i = 0; i < 8; ++i) { auto row = ctx.CreateRow(i); UNIT_ASSERT(!ch->IsFull()); - ch->Push(std::move(row)); + PushRow(ctx, std::move(row), ch); } UNIT_ASSERT_VALUES_EQUAL(8, ch->GetStats()->RowsIn); @@ -198,7 +274,7 @@ void TestOverflow(TTestContext& ctx) { UNIT_ASSERT(ch->IsFull()); try { auto row = ctx.CreateRow(100'500); - ch->Push(std::move(row)); + PushRow(ctx, std::move(row), ch); UNIT_FAIL(""); } catch (yexception& e) { UNIT_ASSERT(TString(e.what()).Contains("requirement !IsFull() failed")); @@ -212,32 +288,26 @@ void TestPopAll(TTestContext& ctx) { settings.CollectProfileStats = true; settings.TransportVersion = ctx.TransportVersion; - auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.HolderFactory, settings, Log); + auto ch = CreateDqOutputChannel(1, ctx.GetOutputType(), ctx.HolderFactory, settings, Log); for (i32 i = 0; i < 50; ++i) { auto row = ctx.CreateRow(i); UNIT_ASSERT(!ch->IsFull()); - ch->Push(std::move(row)); + PushRow(ctx, std::move(row), ch); } UNIT_ASSERT_VALUES_EQUAL(50, ch->GetStats()->RowsIn); UNIT_ASSERT_VALUES_EQUAL(0, ch->GetStats()->RowsOut); NDqProto::TData data; - TUnboxedValueVector buffer; + TUnboxedValueBatch buffer(ctx.GetOutputType()); UNIT_ASSERT(ch->PopAll(data)); UNIT_ASSERT_VALUES_EQUAL(50, data.GetRows()); - ctx.Ds.Deserialize(data, ctx.OutputType, buffer); - UNIT_ASSERT_VALUES_EQUAL(50, buffer.size()); - - for (i32 i = 0; i < 50; ++i) { - UNIT_ASSERT_VALUES_EQUAL(i, buffer[i].GetElement(0).Get<i32>()); - UNIT_ASSERT_VALUES_EQUAL(i * i, buffer[i].GetElement(1).Get<ui64>()); - } - + ctx.Ds.Deserialize(data, ctx.GetOutputType(), buffer); + ValidateBatch(ctx, buffer, 0, 50); data.Clear(); UNIT_ASSERT(!ch->Pop(data)); } @@ -249,18 +319,18 @@ void TestBigRow(TTestContext& ctx) { settings.CollectProfileStats = true; settings.TransportVersion = ctx.TransportVersion; - auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.HolderFactory, settings, Log); + auto ch = CreateDqOutputChannel(1, ctx.GetOutputType(), ctx.HolderFactory, settings, Log); { auto row = ctx.CreateRow(1); UNIT_ASSERT(!ch->IsFull()); - ch->Push(std::move(row)); + PushRow(ctx, std::move(row), ch); } { for (ui32 i = 2; i < 10; ++i) { auto row = ctx.CreateBigRow(i, 10_MB); UNIT_ASSERT(!ch->IsFull()); - ch->Push(std::move(row)); + PushRow(ctx, std::move(row), ch); } } @@ -277,14 +347,27 @@ void TestBigRow(TTestContext& ctx) { UNIT_ASSERT_VALUES_EQUAL(9, ch->GetStats()->RowsIn); UNIT_ASSERT_VALUES_EQUAL(2, ch->GetStats()->RowsOut); - TUnboxedValueVector buffer; - ctx.Ds.Deserialize(data, ctx.OutputType, buffer); + TUnboxedValueBatch buffer(ctx.GetOutputType()); + ctx.Ds.Deserialize(data, ctx.GetOutputType(), buffer); + + UNIT_ASSERT_VALUES_EQUAL(2, buffer.RowCount()); + ui32 i = 1; - UNIT_ASSERT_VALUES_EQUAL(2, buffer.size()); - for (ui32 i = 1; i < 3; ++i) { - UNIT_ASSERT_VALUES_EQUAL(i, buffer[i - 1].GetElement(0).Get<i32>()); - UNIT_ASSERT_VALUES_EQUAL(i * i, buffer[i - 1].GetElement(1).Get<ui64>()); + if (ctx.IsWide) { + buffer.ForEachRowWide([&](const NUdf::TUnboxedValue* values, ui32 width) { + UNIT_ASSERT_VALUES_EQUAL(width, ctx.Width()); + UNIT_ASSERT_VALUES_EQUAL(i, values[0].Get<i32>()); + UNIT_ASSERT_VALUES_EQUAL(i * i, values[1].Get<ui64>()); + ++i; + }); + } else { + buffer.ForEachRow([&](const NUdf::TUnboxedValue& value) { + UNIT_ASSERT_VALUES_EQUAL(i, value.GetElement(0).Get<i32>()); + UNIT_ASSERT_VALUES_EQUAL(i * i, value.GetElement(1).Get<ui64>()); + ++i; + }); } + UNIT_ASSERT_VALUES_EQUAL(3, i); } for (ui32 i = 3; i < 10; ++i) { @@ -296,12 +379,19 @@ void TestBigRow(TTestContext& ctx) { UNIT_ASSERT_VALUES_EQUAL(9, ch->GetStats()->RowsIn); UNIT_ASSERT_VALUES_EQUAL(i, ch->GetStats()->RowsOut); - TUnboxedValueVector buffer; - ctx.Ds.Deserialize(data, ctx.OutputType, buffer); + TUnboxedValueBatch buffer(ctx.GetOutputType()); + ctx.Ds.Deserialize(data, ctx.GetOutputType(), buffer); - UNIT_ASSERT_VALUES_EQUAL(1, buffer.size()); - UNIT_ASSERT_VALUES_EQUAL(i, buffer[0].GetElement(0).Get<i32>()); - UNIT_ASSERT_VALUES_EQUAL(i * i, buffer[0].GetElement(1).Get<ui64>()); + UNIT_ASSERT_VALUES_EQUAL(1, buffer.RowCount()); + + auto head = buffer.Head(); + if (ctx.IsWide) { + UNIT_ASSERT_VALUES_EQUAL(i, head[0].Get<i32>()); + UNIT_ASSERT_VALUES_EQUAL(i * i, head[1].Get<ui64>()); + } else { + UNIT_ASSERT_VALUES_EQUAL(i, head->GetElement(0).Get<i32>()); + UNIT_ASSERT_VALUES_EQUAL(i * i, head->GetElement(1).Get<ui64>()); + } } NDqProto::TData data; @@ -318,12 +408,12 @@ void TestSpillWithMockStorage(TTestContext& ctx) { auto storage = MakeIntrusive<TMockChannelStorage>(100'500ul); settings.ChannelStorage = storage; - auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.HolderFactory, settings, Log); + auto ch = CreateDqOutputChannel(1, ctx.GetOutputType(), ctx.HolderFactory, settings, Log); for (i32 i = 0; i < 35; ++i) { auto row = ctx.CreateRow(i); UNIT_ASSERT(!ch->IsFull()); - ch->Push(std::move(row)); + PushRow(ctx, std::move(row), ch); } UNIT_ASSERT_VALUES_EQUAL(35, ch->GetValuesCount()); @@ -338,16 +428,9 @@ void TestSpillWithMockStorage(TTestContext& ctx) { NDqProto::TData data; while (ch->Pop(data)) { - TUnboxedValueVector buffer; - ctx.Ds.Deserialize(data, ctx.OutputType, buffer); - - UNIT_ASSERT_VALUES_EQUAL(data.GetRows(), buffer.size()); - for (ui32 i = 0; i < data.GetRows(); ++i) { - auto j = loadedRows + i; - UNIT_ASSERT_VALUES_EQUAL(j, buffer[i].GetElement(0).Get<i32>()); - UNIT_ASSERT_VALUES_EQUAL(j * j, buffer[i].GetElement(1).Get<ui64>()); - } - + TUnboxedValueBatch buffer(ctx.GetOutputType()); + ctx.Ds.Deserialize(data, ctx.GetOutputType(), buffer); + ValidateBatch(ctx, buffer, loadedRows, data.GetRows()); loadedRows += data.GetRows(); } UNIT_ASSERT_VALUES_EQUAL(35, loadedRows); @@ -360,23 +443,16 @@ void TestSpillWithMockStorage(TTestContext& ctx) { for (i32 i = 100; i < 105; ++i) { auto row = ctx.CreateRow(i); UNIT_ASSERT(!ch->IsFull()); - ch->Push(std::move(row)); + PushRow(ctx, std::move(row), ch); } UNIT_ASSERT_VALUES_EQUAL(5, ch->GetValuesCount()); NDqProto::TData data; while (ch->Pop(data)) { - TUnboxedValueVector buffer; - ctx.Ds.Deserialize(data, ctx.OutputType, buffer); - - UNIT_ASSERT_VALUES_EQUAL(data.GetRows(), buffer.size()); - for (ui32 i = 0; i < data.GetRows(); ++i) { - auto j = 100 + loadedRows + i; - UNIT_ASSERT_VALUES_EQUAL(j, buffer[i].GetElement(0).Get<i32>()); - UNIT_ASSERT_VALUES_EQUAL(j * j, buffer[i].GetElement(1).Get<ui64>()); - } - + TUnboxedValueBatch buffer(ctx.GetOutputType()); + ctx.Ds.Deserialize(data, ctx.GetOutputType(), buffer); + ValidateBatch(ctx, buffer, loadedRows + 100, data.GetRows()); loadedRows += data.GetRows(); } UNIT_ASSERT_VALUES_EQUAL(5, loadedRows); @@ -394,12 +470,12 @@ void TestOverflowWithMockStorage(TTestContext& ctx) { auto storage = MakeIntrusive<TMockChannelStorage>(500ul); settings.ChannelStorage = storage; - auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.HolderFactory, settings, Log); + auto ch = CreateDqOutputChannel(1, ctx.GetOutputType(), ctx.HolderFactory, settings, Log); for (i32 i = 0; i < 42; ++i) { auto row = ctx.CreateRow(i); UNIT_ASSERT(!ch->IsFull()); - ch->Push(std::move(row)); + PushRow(ctx, std::move(row), ch); } UNIT_ASSERT_VALUES_EQUAL(42, ch->GetStats()->RowsIn); @@ -407,7 +483,7 @@ void TestOverflowWithMockStorage(TTestContext& ctx) { // UNIT_ASSERT(ch->IsFull()); it can be false-negative with storage enabled try { - ch->Push(ctx.CreateBigRow(0, 100'500)); + PushRow(ctx, ctx.CreateBigRow(0, 100'500), ch); UNIT_FAIL(""); } catch (yexception &e) { UNIT_ASSERT(TString(e.what()).Contains("Space limit exceeded")); @@ -439,7 +515,36 @@ Y_UNIT_TEST(PopAll) { } Y_UNIT_TEST(BigRow) { - TTestContext ctx(NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0, true); + TTestContext ctx(NARROW_CHANNEL, NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0, true); + TestBigRow(ctx); +} + +} + +Y_UNIT_TEST_SUITE(DqOutputWideChannelTests) { + +Y_UNIT_TEST(SingleRead) { + TTestContext ctx(WIDE_CHANNEL); + TestSingleRead(ctx); +} + +Y_UNIT_TEST(PartialRead) { + TTestContext ctx(WIDE_CHANNEL); + TestPartialRead(ctx); +} + +Y_UNIT_TEST(Overflow) { + TTestContext ctx(WIDE_CHANNEL); + TestOverflow(ctx); +} + +Y_UNIT_TEST(PopAll) { + TTestContext ctx(WIDE_CHANNEL); + TestPopAll(ctx); +} + +Y_UNIT_TEST(BigRow) { + TTestContext ctx(WIDE_CHANNEL, NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0, true); TestBigRow(ctx); } @@ -453,7 +558,21 @@ Y_UNIT_TEST(Spill) { } Y_UNIT_TEST(Overflow) { - TTestContext ctx(NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0, true); + TTestContext ctx(NARROW_CHANNEL, NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0, true); + TestOverflowWithMockStorage(ctx); +} + +} + +Y_UNIT_TEST_SUITE(DqOutputWideChannelWithStorageTests) { + +Y_UNIT_TEST(Spill) { + TTestContext ctx(WIDE_CHANNEL); + TestSpillWithMockStorage(ctx); +} + +Y_UNIT_TEST(Overflow) { + TTestContext ctx(WIDE_CHANNEL, NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0, true); TestOverflowWithMockStorage(ctx); } diff --git a/ydb/library/yql/dq/runtime/dq_output_consumer.cpp b/ydb/library/yql/dq/runtime/dq_output_consumer.cpp index da9e8315c0b..a74f6787386 100644 --- a/ydb/library/yql/dq/runtime/dq_output_consumer.cpp +++ b/ydb/library/yql/dq/runtime/dq_output_consumer.cpp @@ -27,6 +27,12 @@ public: return AnyOf(Consumers, [](const auto& consumer) { return consumer->IsFull(); }); } + void WideConsume(TUnboxedValue* values, ui32 count) override { + Y_UNUSED(values); + Y_UNUSED(count); + YQL_ENSURE(false, "WideConsume is not supported"); + } + void Consume(TUnboxedValue&& value) override { if (Consumers.size() == 1) { Consumers[0]->Consume(std::move(value)); @@ -68,6 +74,10 @@ public: Output->Push(std::move(value)); } + void WideConsume(TUnboxedValue* values, ui32 count) override { + Output->WidePush(values, count); + } + void Consume(NDqProto::TCheckpoint&& checkpoint) override { Output->Push(std::move(checkpoint)); } @@ -84,6 +94,7 @@ class TDqOutputHashPartitionConsumer : public IDqOutputConsumer { private: mutable bool IsWaitingFlag = false; mutable TUnboxedValue WaitingValue; + mutable TUnboxedValueVector WideWaitingValues; mutable IDqOutput::TPtr OutputWaiting; protected: void DrainWaiting() const { @@ -102,16 +113,22 @@ protected: } public: TDqOutputHashPartitionConsumer(TVector<IDqOutput::TPtr>&& outputs, - TVector<NKikimr::NMiniKQL::TType*>&& keyColumnTypes, TVector<ui32>&& keyColumnIndices) + TVector<NKikimr::NMiniKQL::TType*>&& keyColumnTypes, TVector<ui32>&& keyColumnIndices, + TMaybe<ui32> outputWidth) : Outputs(std::move(outputs)) , KeyColumnIndices(std::move(keyColumnIndices)) , ValueHashers(KeyColumnIndices.size(), NUdf::IHash::TPtr{}) + , OutputWidth(outputWidth) { MKQL_ENSURE_S(keyColumnTypes.size() == KeyColumnIndices.size()); for (auto i = 0U; i < keyColumnTypes.size(); i++) { ValueHashers[i] = MakeHashImpl(keyColumnTypes[i]); } + + if (outputWidth.Defined()) { + WideWaitingValues.resize(*outputWidth); + } } bool IsFull() const override { @@ -120,6 +137,7 @@ public: } void Consume(TUnboxedValue&& value) final { + YQL_ENSURE(!OutputWidth.Defined()); ui32 partitionIndex = GetHashPartitionIndex(value); if (Outputs[partitionIndex]->IsFull()) { YQL_ENSURE(!IsWaitingFlag); @@ -131,6 +149,21 @@ public: } } + void WideConsume(TUnboxedValue* values, ui32 count) final { + YQL_ENSURE(OutputWidth.Defined() && count == OutputWidth); + ui32 partitionIndex = GetHashPartitionIndex(values); + if (Outputs[partitionIndex]->IsFull()) { + YQL_ENSURE(!IsWaitingFlag); + IsWaitingFlag = true; + OutputWaiting = Outputs[partitionIndex]; + for (ui32 i = 0; i < count; ++i) { + WideWaitingValues[i] = std::move(values[i]); + } + } else { + Outputs[partitionIndex]->WidePush(values, count); + } + } + void Consume(NDqProto::TCheckpoint&& checkpoint) override { for (auto& output : Outputs) { output->Push(NDqProto::TCheckpoint(checkpoint)); @@ -155,6 +188,17 @@ private: return hash % Outputs.size(); } + size_t GetHashPartitionIndex(const TUnboxedValue* values) { + ui64 hash = 0; + + for (size_t keyId = 0; keyId < KeyColumnIndices.size(); keyId++) { + MKQL_ENSURE_S(KeyColumnIndices[keyId] < OutputWidth); + hash = CombineHashes(hash, HashColumn(keyId, values[KeyColumnIndices[keyId]])); + } + + return hash % Outputs.size(); + } + ui64 HashColumn(size_t keyId, const TUnboxedValue& value) const { if (!value.HasValue()) { return 0; @@ -166,24 +210,38 @@ private: TVector<IDqOutput::TPtr> Outputs; TVector<ui32> KeyColumnIndices; TVector<NUdf::IHash::TPtr> ValueHashers; + const TMaybe<ui32> OutputWidth; }; class TDqOutputBroadcastConsumer : public IDqOutputConsumer { public: - TDqOutputBroadcastConsumer(TVector<IDqOutput::TPtr>&& outputs) - : Outputs(std::move(outputs)) {} + TDqOutputBroadcastConsumer(TVector<IDqOutput::TPtr>&& outputs, TMaybe<ui32> outputWidth) + : Outputs(std::move(outputs)) + , OutputWidth(outputWidth) + , Tmp(outputWidth.Defined() ? *outputWidth : 0u) + { + } bool IsFull() const override { return AnyOf(Outputs, [](const auto& output) { return output->IsFull(); }); } void Consume(TUnboxedValue&& value) final { + YQL_ENSURE(!OutputWidth.Defined()); for (auto& output : Outputs) { TUnboxedValue copy{ value }; output->Push(std::move(copy)); } } + void WideConsume(TUnboxedValue* values, ui32 count) final { + YQL_ENSURE(OutputWidth.Defined() && OutputWidth == count); + for (auto& output : Outputs) { + std::copy(values, values + count, Tmp.begin()); + output->WidePush(Tmp.data(), count); + } + } + void Consume(NDqProto::TCheckpoint&& checkpoint) override { for (auto& output : Outputs) { output->Push(NDqProto::TCheckpoint(checkpoint)); @@ -198,6 +256,8 @@ public: private: TVector<IDqOutput::TPtr> Outputs; + const TMaybe<ui32> OutputWidth; + TUnboxedValueVector Tmp; }; } // namespace @@ -212,14 +272,14 @@ IDqOutputConsumer::TPtr CreateOutputMapConsumer(IDqOutput::TPtr output) { IDqOutputConsumer::TPtr CreateOutputHashPartitionConsumer( TVector<IDqOutput::TPtr>&& outputs, - TVector<NKikimr::NMiniKQL::TType*>&& keyColumnTypes, TVector<ui32>&& keyColumnIndices) + TVector<NKikimr::NMiniKQL::TType*>&& keyColumnTypes, TVector<ui32>&& keyColumnIndices, TMaybe<ui32> outputWidth) { return MakeIntrusive<TDqOutputHashPartitionConsumer>(std::move(outputs), std::move(keyColumnTypes), - std::move(keyColumnIndices)); + std::move(keyColumnIndices), outputWidth); } -IDqOutputConsumer::TPtr CreateOutputBroadcastConsumer(TVector<IDqOutput::TPtr>&& outputs) { - return MakeIntrusive<TDqOutputBroadcastConsumer>(std::move(outputs)); +IDqOutputConsumer::TPtr CreateOutputBroadcastConsumer(TVector<IDqOutput::TPtr>&& outputs, TMaybe<ui32> outputWidth) { + return MakeIntrusive<TDqOutputBroadcastConsumer>(std::move(outputs), outputWidth); } } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/runtime/dq_output_consumer.h b/ydb/library/yql/dq/runtime/dq_output_consumer.h index cdfcec0eee3..8bb7ea1ece7 100644 --- a/ydb/library/yql/dq/runtime/dq_output_consumer.h +++ b/ydb/library/yql/dq/runtime/dq_output_consumer.h @@ -26,6 +26,7 @@ public: } virtual bool IsFull() const = 0; virtual void Consume(NKikimr::NUdf::TUnboxedValue&& value) = 0; + virtual void WideConsume(NKikimr::NUdf::TUnboxedValue* values, ui32 count) = 0; virtual void Consume(NDqProto::TCheckpoint&& checkpoint) = 0; virtual void Finish() = 0; bool IsFinishing() const { @@ -43,9 +44,9 @@ IDqOutputConsumer::TPtr CreateOutputMapConsumer(IDqOutput::TPtr output); IDqOutputConsumer::TPtr CreateOutputHashPartitionConsumer( TVector<IDqOutput::TPtr>&& outputs, - TVector<NKikimr::NMiniKQL::TType*>&& keyColumnTypes, TVector<ui32>&& keyColumnIndices); + TVector<NKikimr::NMiniKQL::TType*>&& keyColumnTypes, TVector<ui32>&& keyColumnIndices, TMaybe<ui32> outputWidth); -IDqOutputConsumer::TPtr CreateOutputBroadcastConsumer(TVector<IDqOutput::TPtr>&& outputs); +IDqOutputConsumer::TPtr CreateOutputBroadcastConsumer(TVector<IDqOutput::TPtr>&& outputs, TMaybe<ui32> outputWidth); } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index 19ffc4651b1..6c1542a9998 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -163,6 +163,10 @@ NUdf::TUnboxedValue DqBuildInputValue(const NDqProto::TTaskInput& inputDesc, con IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NMiniKQL::TType* type, const NMiniKQL::TTypeEnvironment& typeEnv, TVector<IDqOutput::TPtr>&& outputs) { + TMaybe<ui32> outputWidth; + if (type->IsMulti()) { + outputWidth = static_cast<const NMiniKQL::TMultiType*>(type)->GetElementsCount(); + } auto guard = typeEnv.BindAllocator(); switch (outputDesc.GetTypeCase()) { case NDqProto::TTaskOutput::kSink: @@ -186,11 +190,11 @@ IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outpu } return CreateOutputHashPartitionConsumer(std::move(outputs), std::move(keyColumnTypes), - std::move(keyColumnIndices)); + std::move(keyColumnIndices), outputWidth); } case NDqProto::TTaskOutput::kBroadcast: { - return CreateOutputBroadcastConsumer(std::move(outputs)); + return CreateOutputBroadcastConsumer(std::move(outputs), outputWidth); } case NDqProto::TTaskOutput::kRangePartition: { @@ -665,6 +669,9 @@ public: AllocatedHolder->Output = nullptr; } else if (outputConsumers.size() == 1) { AllocatedHolder->Output = std::move(outputConsumers[0]); + if (entry->OutputItemTypes[0]->IsMulti()) { + AllocatedHolder->OutputWideType = static_cast<TMultiType*>(entry->OutputItemTypes[0]); + } } else { auto guard = BindAllocator(); AllocatedHolder->Output = CreateOutputMultiConsumer(std::move(outputConsumers)); @@ -888,6 +895,12 @@ private: return ERunStatus::PendingOutput; } } + + TUnboxedValueVector wideBuffer; + const bool isWide = AllocatedHolder->OutputWideType != nullptr; + if (isWide) { + wideBuffer.resize(AllocatedHolder->OutputWideType->GetElementsCount()); + } while (!AllocatedHolder->Output->IsFull()) { if (Y_UNLIKELY(CollectProfileStats)) { auto now = TInstant::Now(); @@ -896,11 +909,20 @@ private: } NUdf::TUnboxedValue value; - auto fetchStatus = AllocatedHolder->ResultStream.Fetch(value); + NUdf::EFetchStatus fetchStatus; + if (isWide) { + fetchStatus = AllocatedHolder->ResultStream.WideFetch(wideBuffer.data(), wideBuffer.size()); + } else { + fetchStatus = AllocatedHolder->ResultStream.Fetch(value); + } switch (fetchStatus) { case NUdf::EFetchStatus::Ok: { - AllocatedHolder->Output->Consume(std::move(value)); + if (isWide) { + AllocatedHolder->Output->WideConsume(wideBuffer.data(), wideBuffer.size()); + } else { + AllocatedHolder->Output->Consume(std::move(value)); + } break; } case NUdf::EFetchStatus::Finish: { @@ -964,6 +986,7 @@ private: THashMap<ui64, TOutputTransformInfo> OutputTransforms; // Output index -> Transform IDqOutputConsumer::TPtr Output; + TMultiType* OutputWideType = nullptr; NUdf::TUnboxedValue ResultStream; }; diff --git a/ydb/library/yql/dq/runtime/dq_transport.cpp b/ydb/library/yql/dq/runtime/dq_transport.cpp index bfcd936c405..8d66a49abae 100644 --- a/ydb/library/yql/dq/runtime/dq_transport.cpp +++ b/ydb/library/yql/dq/runtime/dq_transport.cpp @@ -35,6 +35,31 @@ NDqProto::TData SerializeValuePickleV1(const TType* type, const NUdf::TUnboxedVa } template<bool Fast> +NDqProto::TData SerializeBufferPickleV1(const TType* type, const TUnboxedValueBatch& buffer) { + using TPacker = TValuePackerTransport<Fast>; + + TPacker packer(/* stable */ false, type); + if (type->IsMulti()) { + buffer.ForEachRowWide([&packer](const auto* values, ui32 width) { + packer.AddWideItem(values, width); + }); + } else { + buffer.ForEachRow([&packer](const auto value) { + packer.AddItem(value); + }); + } + const auto& packResult = packer.Finish(); + + NDqProto::TData data; + data.SetTransportVersion(Fast ? NDqProto::DATA_TRANSPORT_UV_FAST_PICKLE_1_0 : NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0); + data.MutableRaw()->reserve(packResult.Size()); + packResult.CopyTo(*data.MutableRaw()); + data.SetRows(buffer.RowCount()); + + return data; +} + +template<bool Fast> void DeserializeValuePickleV1(const TType* type, const NDqProto::TData& data, NUdf::TUnboxedValue& value, const THolderFactory& holderFactory) { @@ -45,7 +70,7 @@ void DeserializeValuePickleV1(const TType* type, const NDqProto::TData& data, NU template<bool Fast> void DeserializeBufferPickleV1(const NDqProto::TData& data, const TType* itemType, - const THolderFactory& holderFactory, TUnboxedValueVector& buffer) + const THolderFactory& holderFactory, TUnboxedValueBatch& buffer) { using TPacker = TValuePackerTransport<Fast>; TPacker packer(/* stable */ false, itemType); @@ -71,8 +96,24 @@ NDqProto::TData TDqDataSerializer::Serialize(const NUdf::TUnboxedValue& value, c } } +NDqProto::TData TDqDataSerializer::Serialize(const NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, + const NKikimr::NMiniKQL::TType* itemType) const +{ + auto guard = TypeEnv.BindAllocator(); + switch (TransportVersion) { + case NDqProto::DATA_TRANSPORT_VERSION_UNSPECIFIED: + case NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0: + return SerializeBufferPickleV1<false>(itemType, buffer); + case NDqProto::DATA_TRANSPORT_UV_FAST_PICKLE_1_0: + return SerializeBufferPickleV1<true>(itemType, buffer); + default: + YQL_ENSURE(false, "Unsupported TransportVersion"); + } + +} + void TDqDataSerializer::Deserialize(const NDqProto::TData& data, const TType* itemType, - TUnboxedValueVector& buffer) const + TUnboxedValueBatch& buffer) const { auto guard = TypeEnv.BindAllocator(); switch (data.GetTransportVersion()) { diff --git a/ydb/library/yql/dq/runtime/dq_transport.h b/ydb/library/yql/dq/runtime/dq_transport.h index b22e947cb78..469383c8a13 100644 --- a/ydb/library/yql/dq/runtime/dq_transport.h +++ b/ydb/library/yql/dq/runtime/dq_transport.h @@ -7,6 +7,7 @@ #include <ydb/library/yql/ast/yql_expr.h> #include <ydb/library/yql/minikql/mkql_function_registry.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_pack.h> @@ -23,6 +24,7 @@ public: NDqProto::EDataTransportVersion GetTransportVersion() const; NDqProto::TData Serialize(const NUdf::TUnboxedValue& value, const NKikimr::NMiniKQL::TType* itemType) const; + NDqProto::TData Serialize(const NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, const NKikimr::NMiniKQL::TType* itemType) const; template <class TForwardIterator> NDqProto::TData Serialize(TForwardIterator first, TForwardIterator last, const NKikimr::NMiniKQL::TType* itemType) const { @@ -41,7 +43,7 @@ public: } void Deserialize(const NDqProto::TData& data, const NKikimr::NMiniKQL::TType* itemType, - NKikimr::NMiniKQL::TUnboxedValueVector& buffer) const; + NKikimr::NMiniKQL::TUnboxedValueBatch& buffer) const; void Deserialize(const NDqProto::TData& data, const NKikimr::NMiniKQL::TType* itemType, NUdf::TUnboxedValue& value) const; struct TEstimateSizeSettings { diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_holders.h b/ydb/library/yql/minikql/computation/mkql_computation_node_holders.h index 96c8012df53..b88542f70fb 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_holders.h +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_holders.h @@ -46,6 +46,225 @@ using TUnboxedValueDeque = std::deque<NUdf::TUnboxedValue, TMKQLAllocator<NUdf:: using TKeyPayloadPair = std::pair<NUdf::TUnboxedValue, NUdf::TUnboxedValue>; using TKeyPayloadPairVector = std::vector<TKeyPayloadPair, TMKQLAllocator<TKeyPayloadPair>>; +class TUnboxedValueBatch { + // TUnboxedValueBatch represents column values for RowCount rows + // If wide encoding is used and each row contains Width columns, Values consists of Width * RowCount items: + // first Width elements correspond to first row, + // second Width elements - to second row, etc + // For narrow encoding, each row is represented as a single item (a struct) - so Width is equal to 1 +private: + using TBottomType = TUnboxedValueVector; + using TTopType = std::deque<TBottomType>; + +public: + using value_type = NUdf::TUnboxedValue; + + explicit TUnboxedValueBatch(const TType* rowType = nullptr) + : Width_((rowType && rowType->IsMulti()) ? static_cast<const TMultiType*>(rowType)->GetElementsCount() : 1u) + , IsWide_(rowType && rowType->IsMulti()) + , PageSize_(GetPageSize(Width_)) + { + } + + TUnboxedValueBatch(const TUnboxedValueBatch& other) = default; + TUnboxedValueBatch& operator=(const TUnboxedValueBatch& other) = default; + + TUnboxedValueBatch(TUnboxedValueBatch&& other) + : Width_(other.Width_) + , IsWide_(other.IsWide_) + , PageSize_(other.PageSize_) + , Values_(std::move(other.Values_)) + , RowOffset_(other.RowOffset_) + , RowCount_(other.RowCount_) + { + other.clear(); + } + + inline void clear() { + Values_.clear(); + RowOffset_ = RowCount_ = 0; + } + + inline bool empty() const { + return RowCount_ == 0; + } + + inline void swap(TUnboxedValueBatch& other) { + std::swap(Width_, other.Width_); + std::swap(PageSize_, other.PageSize_); + std::swap(Values_, other.Values_); + std::swap(RowOffset_, other.RowOffset_); + std::swap(RowCount_, other.RowCount_); + } + + template<typename... TArgs> + void emplace_back(TArgs&&... args) { + MKQL_ENSURE(!IsWide(), "emplace_back() should not be used for wide batch"); + if (Values_.empty() || Values_.back().size() == Values_.back().capacity()) { + Values_.emplace_back(); + Values_.back().reserve(PageSize_); + } + Values_.back().emplace_back(std::forward<TArgs>(args)...); + RowCount_++; + } + + inline void push_back(const value_type& row) { + emplace_back(row); + } + + inline void push_back(value_type&& row) { + emplace_back(std::move(row)); + } + + template<typename TFunc> + auto ForEachRow(const TFunc& cb) const { + MKQL_ENSURE(!IsWide(), "ForEachRowWide() should be used instead"); + return DoForEachRow<const NUdf::TUnboxedValue, const TUnboxedValueBatch>(this, + [&cb](const NUdf::TUnboxedValue* values, ui32 width) { + Y_VERIFY_DEBUG(width == 1); + Y_VERIFY_DEBUG(values); + return cb(*values); + }); + } + + template<typename TFunc> + auto ForEachRow(const TFunc& cb) { + MKQL_ENSURE(!IsWide(), "ForEachRowWide() should be used instead"); + return DoForEachRow<NUdf::TUnboxedValue, TUnboxedValueBatch>(this, + [&cb](NUdf::TUnboxedValue* values, ui32 width) { + Y_VERIFY_DEBUG(width == 1); + Y_VERIFY_DEBUG(values); + return cb(*values); + }); + } + + template<typename TFunc> + auto ForEachRowWide(const TFunc& cb) const { + MKQL_ENSURE(IsWide(), "ForEachRow() should be used instead"); + return DoForEachRow<const NUdf::TUnboxedValue, const TUnboxedValueBatch>(this, cb); + } + + template<typename TFunc> + auto ForEachRowWide(const TFunc& cb) { + MKQL_ENSURE(IsWide(), "ForEachRow() should be used instead"); + return DoForEachRow<NUdf::TUnboxedValue, TUnboxedValueBatch>(this, cb); + } + + inline TMaybe<ui32> Width() const { + return IsWide_ ? Width_ : TMaybe<ui32>{}; + } + + inline bool IsWide() const { + return IsWide_; + } + + inline ui64 RowCount() const { + return RowCount_; + } + + const value_type* Head() const { + MKQL_ENSURE(RowCount_, "Head() on empty batch"); + return Width_ ? &Values_.front()[RowOffset_ * Width_] : nullptr; + } + + value_type* Head() { + MKQL_ENSURE(RowCount_, "Head() on empty batch"); + return Width_ ? &Values_.front()[RowOffset_ * Width_] : nullptr; + } + + inline void Pop(size_t rowCount = 1) { + MKQL_ENSURE(rowCount <= RowCount_, "Invalid arg"); + ui64 newStartOffset = (RowOffset_ + rowCount) * Width_; + while (newStartOffset >= PageSize_) { + MKQL_ENSURE_S(!Values_.empty()); + Values_.pop_front(); + newStartOffset -= PageSize_; + } + + RowOffset_ = Width_ ? newStartOffset / Width_ : 0; + RowCount_ -= rowCount; + } + + template<typename TFunc> + void PushRow(const TFunc& producer) { + ReserveNextRow(); + for (ui32 i = 0; i < Width_; ++i) { + Values_.back().emplace_back(producer(i)); + } + ++RowCount_; + } + + void PushRow(NUdf::TUnboxedValue* values, ui32 width) { + Y_VERIFY_DEBUG(width == Width_); + ReserveNextRow(); + for (ui32 i = 0; i < Width_; ++i) { + Values_.back().emplace_back(std::move(values[i])); + } + ++RowCount_; + } + +private: + static const size_t DesiredPageSize = 1024; + static inline size_t GetPageSize(size_t width) { + if (!width) { + return DesiredPageSize; + } + size_t pageSize = DesiredPageSize + width - 1; + return pageSize - pageSize % width; + } + + inline void ReserveNextRow() { + bool full = Width_ && (Values_.empty() || Values_.back().size() == PageSize_); + if (full) { + Values_.emplace_back(); + Values_.back().reserve(PageSize_); + } + } + + template<typename TValue, typename TParent, typename TFunc> + static auto DoForEachRow(TParent* parent, const TFunc& cb) { + using TReturn = typename std::result_of<TFunc(TValue*, ui32)>::type; + + auto currTop = parent->Values_.begin(); + + Y_VERIFY_DEBUG(parent->PageSize_ > parent->RowOffset_); + Y_VERIFY_DEBUG(parent->Width_ == 0 || (parent->PageSize_ - parent->RowOffset_) % parent->Width_ == 0); + + size_t valuesOnPage = parent->PageSize_ - parent->RowOffset_; + + TValue* values = (parent->Width_ && parent->RowCount_) ? currTop->data() + parent->RowOffset_ : nullptr; + for (size_t i = 0; i < parent->RowCount_; ++i) { + if constexpr (std::is_same_v<TReturn, bool>) { + if (!cb(values, parent->Width_)) { + return false; + } + } else { + static_assert(std::is_same_v<TReturn, void>, "Callback should either return bool or void"); + cb(values, parent->Width_); + } + values += parent->Width_; + valuesOnPage -= parent->Width_; + if (!valuesOnPage) { + valuesOnPage = parent->PageSize_; + ++currTop; + values = currTop->data(); + } + } + + if constexpr (std::is_same_v<TReturn, bool>) { + return true; + } + } + + ui32 Width_; + bool IsWide_; + size_t PageSize_; + + TTopType Values_; + ui64 RowOffset_ = 0; + ui64 RowCount_ = 0; +}; + inline int CompareValues(NUdf::EDataSlot type, bool asc, bool isOptional, const NUdf::TUnboxedValuePod& lhs, const NUdf::TUnboxedValuePod& rhs) { int cmp; if (isOptional) { diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp index 9c375c5d6a4..91684b1cbea 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp @@ -216,6 +216,16 @@ bool HasOptionalFields(const TType* type) { return HasOptionalFields(taggedType->GetBaseType()); } + case TType::EKind::Multi: { + auto multiType = static_cast<const TMultiType*>(type); + for (ui32 index = 0; index < multiType->GetElementsCount(); ++index) { + if (HasOptionalFields(multiType->GetElementType(index))) { + return true; + } + } + return false; + } + default: THROW yexception() << "Unsupported type: " << type->GetKindAsStr(); } @@ -889,6 +899,7 @@ TStringBuf TValuePackerGeneric<Fast>::Pack(const NUdf::TUnboxedValuePod& value) template<bool Fast> TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, const TType* type) : Type_(type) + , Width_(type->IsMulti() ? static_cast<const TMultiType*>(type)->GetElementsCount() : TMaybe<ui32>()) , State_(ScanTypeProperties(Type_, false)) , IncrementalState_(ScanTypeProperties(Type_, true)) { @@ -909,7 +920,7 @@ NUdf::TUnboxedValue TValuePackerTransport<Fast>::Unpack(TStringBuf buf, const TH } template<bool Fast> -void TValuePackerTransport<Fast>::UnpackBatch(TStringBuf buf, const THolderFactory& holderFactory, TUnboxedValueVector& result) const { +void TValuePackerTransport<Fast>::UnpackBatch(TStringBuf buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const { auto& s = IncrementalState_; ui64 len; ui32 topLength; @@ -930,9 +941,21 @@ void TValuePackerTransport<Fast>::UnpackBatch(TStringBuf buf, const THolderFacto len = NDetails::GetRawData<ui64>(buf); } - result.reserve(len + result.size()); - for (ui64 i = 0; i < len; ++i) { - result.emplace_back(UnpackFromContigousBuffer<Fast>(itemType, buf, topLength, holderFactory, s)); + if (Type_->IsMulti()) { + auto multiType = static_cast<const TMultiType*>(Type_); + const ui32 width = multiType->GetElementsCount(); + Y_VERIFY_DEBUG(result.IsWide()); + Y_VERIFY_DEBUG(result.Width() == width); + for (ui64 i = 0; i < len; ++i) { + result.PushRow([&](ui32 j) { + return UnpackFromContigousBuffer<Fast>(multiType->GetElementType(j), buf, topLength, holderFactory, s); + }); + } + } else { + Y_VERIFY_DEBUG(!result.IsWide()); + for (ui64 i = 0; i < len; ++i) { + result.emplace_back(UnpackFromContigousBuffer<Fast>(itemType, buf, topLength, holderFactory, s)); + } } MKQL_ENSURE(buf.empty(), "Bad packed data. Not fully data read"); } @@ -953,17 +976,23 @@ const TPagedBuffer& TValuePackerTransport<Fast>::Pack(const NUdf::TUnboxedValueP } template<bool Fast> +void TValuePackerTransport<Fast>::StartPack() { + Buffer_.Clear(); + if constexpr (Fast) { + // reserve place for list item count + Buffer_.ReserveHeader(sizeof(ItemCount_)); + } else { + IncrementalState_.OptionalUsageMask.Reset(); + Buffer_.ReserveHeader(sizeof(ui32) + State_.OptionalMaskReserve + MAX_PACKED64_SIZE); + } +} + +template<bool Fast> TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddItem(const NUdf::TUnboxedValuePod& value) { + Y_VERIFY_DEBUG(!Type_->IsMulti()); const TType* itemType = Type_; if (!ItemCount_) { - Buffer_.Clear(); - if constexpr (Fast) { - // reserve place for list item count - Buffer_.ReserveHeader(sizeof(ItemCount_)); - } else { - IncrementalState_.OptionalUsageMask.Reset(); - Buffer_.ReserveHeader(sizeof(ui32) + State_.OptionalMaskReserve + MAX_PACKED64_SIZE); - } + StartPack(); } PackImpl<Fast, false>(itemType, Buffer_, value, IncrementalState_); @@ -972,6 +1001,22 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddItem(const NUdf::TU } template<bool Fast> +TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItem(const NUdf::TUnboxedValuePod* values, ui32 width) { + Y_VERIFY_DEBUG(Type_->IsMulti()); + Y_VERIFY_DEBUG(static_cast<const TMultiType*>(Type_)->GetElementsCount() == width); + const TMultiType* itemType = static_cast<const TMultiType*>(Type_); + if (!ItemCount_) { + StartPack(); + } + + for (ui32 i = 0; i < width; ++i) { + PackImpl<Fast, false>(itemType->GetElementType(i), Buffer_, values[i], IncrementalState_); + } + ++ItemCount_; + return *this; +} + +template<bool Fast> void TValuePackerTransport<Fast>::Clear() { Buffer_.Clear(); ItemCount_ = 0; diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h index 16c7d423f18..53710d46029 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h @@ -78,6 +78,7 @@ public: // AddItem()/UnpackBatch() will perform incremental packing - type T is processed as list item type. Will produce List<T> layout TSelf& AddItem(const NUdf::TUnboxedValuePod& value); + TSelf& AddWideItem(const NUdf::TUnboxedValuePod* values, ui32 count); size_t PackedSizeEstimate() const { return Buffer_.Size() + Buffer_.ReservedHeaderSize(); } @@ -89,11 +90,13 @@ public: // reference is valid till the next call to Pack() const TPagedBuffer& Pack(const NUdf::TUnboxedValuePod &value) const; NUdf::TUnboxedValue Unpack(TStringBuf buf, const THolderFactory& holderFactory) const; - void UnpackBatch(TStringBuf buf, const THolderFactory& holderFactory, TUnboxedValueVector& result) const; + void UnpackBatch(TStringBuf buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const; private: void BuildMeta(bool addItemCount) const; + void StartPack(); const TType* const Type_; + const TMaybe<ui32> Width_; ui64 ItemCount_ = 0; mutable TPagedBuffer Buffer_; mutable NDetails::TPackerState State_; diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp index 1ceb8ec4047..b750210c2a7 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp @@ -548,13 +548,13 @@ protected: UNIT_ASSERT_VALUES_EQUAL(std::string_view(uVal.AsStringRef()), str); } - TUnboxedValueVector items; + TUnboxedValueBatch items; packer.UnpackBatch(serializedStr, HolderFactory, items); - UNIT_ASSERT_VALUES_EQUAL(items.size(), count); - for (auto& uVal : items) { - UNIT_ASSERT(uVal); - UNIT_ASSERT_VALUES_EQUAL(std::string_view(uVal.AsStringRef()), str); - } + UNIT_ASSERT_VALUES_EQUAL(items.RowCount(), count); + items.ForEachRow([&](const NUdf::TUnboxedValue& value) { + UNIT_ASSERT(value); + UNIT_ASSERT_VALUES_EQUAL(std::string_view(value.AsStringRef()), str); + }); } } diff --git a/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp b/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp index 1d97b87d40b..6dd36387665 100644 --- a/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp +++ b/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp @@ -87,7 +87,7 @@ private: } } - i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, TMaybe<TInstant>&, bool& finished, i64 /*freeSpace*/) final { + i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, TMaybe<TInstant>&, bool& finished, i64 /*freeSpace*/) final { if (Result) { const auto size = Result->size(); buffer.emplace_back(NKikimr::NMiniKQL::MakeString(std::string_view(*Result))); diff --git a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp index 402e9e73115..61e4be5668a 100644 --- a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp +++ b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp @@ -109,6 +109,7 @@ TFakeCASetup::~TFakeCASetup() { void TFakeCASetup::AsyncOutputWrite(const TWriteValueProducer valueProducer, TMaybe<NDqProto::TCheckpoint> checkpoint, bool finish) { Execute([&valueProducer, checkpoint, finish](TFakeActor& actor) { auto batch = valueProducer(actor.GetHolderFactory()); + Y_ASSERT(actor.DqAsyncOutput); actor.DqAsyncOutput->SendData(std::move(batch), 0, checkpoint, finish); }); diff --git a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h index 90e08b96158..8bee231a43c 100644 --- a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h +++ b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h @@ -24,7 +24,7 @@ using TRuntimePtr = std::unique_ptr<NActors::TTestActorRuntime>; using TCallback = std::function<void(TFakeActor&)>; template<typename T> using TReadValueParser = std::function<std::vector<T>(const NUdf::TUnboxedValue&)>; -using TWriteValueProducer = std::function<NKikimr::NMiniKQL::TUnboxedValueVector(NKikimr::NMiniKQL::THolderFactory&)>; +using TWriteValueProducer = std::function<NKikimr::NMiniKQL::TUnboxedValueBatch(NKikimr::NMiniKQL::THolderFactory&)>; namespace { struct TEvPrivate { @@ -191,15 +191,15 @@ struct TFakeCASetup { NThreading::TFuture<bool> nextDataFuture; Execute([&result, &parser, freeSpace, &nextDataFutureOut, this](TFakeActor& actor) { TMaybe<TInstant> watermark; - NKikimr::NMiniKQL::TUnboxedValueVector buffer; + NKikimr::NMiniKQL::TUnboxedValueBatch buffer; bool finished = false; actor.DqAsyncInput->GetAsyncInputData(buffer, watermark, finished, freeSpace); - for (const auto& uv : buffer) { - for (const auto item : parser(uv)) { + buffer.ForEachRow([&](const NUdf::TUnboxedValue& value) { + for (const auto item : parser(value)) { result.emplace_back(item); } - } + }); if (watermark) { result.emplace_back(*watermark); diff --git a/ydb/library/yql/providers/dq/actors/proto_builder.cpp b/ydb/library/yql/providers/dq/actors/proto_builder.cpp index cafacb757c1..1019e7d7031 100644 --- a/ydb/library/yql/providers/dq/actors/proto_builder.cpp +++ b/ydb/library/yql/providers/dq/actors/proto_builder.cpp @@ -96,15 +96,13 @@ bool TProtoBuilder::WriteData(const NDqProto::TData& data, const std::function<b const auto transportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_VERSION_UNSPECIFIED; NDq::TDqDataSerializer dataSerializer(TypeEnv, holderFactory, transportVersion); - TUnboxedValueVector buffer; + YQL_ENSURE(!ResultType->IsMulti()); + TUnboxedValueBatch buffer(ResultType); dataSerializer.Deserialize(data, ResultType, buffer); - for (const auto& item : buffer) { - if (!func(item)) { - return false; - } - } - return true; + return buffer.ForEachRow([&func](const auto& value) { + return func(value); + }); } bool TProtoBuilder::WriteData(const TVector<NDqProto::TData>& rows, const std::function<bool(const NYql::NUdf::TUnboxedValuePod& value)>& func) { @@ -115,13 +113,13 @@ bool TProtoBuilder::WriteData(const TVector<NDqProto::TData>& rows, const std::f const auto transportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_VERSION_UNSPECIFIED; NDq::TDqDataSerializer dataSerializer(TypeEnv, holderFactory, transportVersion); + YQL_ENSURE(!ResultType->IsMulti()); + for (const auto& part : rows) { - TUnboxedValueVector buffer; + TUnboxedValueBatch buffer(ResultType); dataSerializer.Deserialize(part, ResultType, buffer); - for (const auto& item : buffer) { - if (!func(item)) { - return false; - } + if (!buffer.ForEachRow([&func](const auto& value) { return func(value); })) { + return false; } } return true; diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp index b329f3e3474..d97928a6614 100644 --- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp @@ -560,7 +560,7 @@ private: } auto guard = source.TypeEnv->BindAllocator(); TMaybe<TInstant> watermark; - NKikimr::NMiniKQL::TUnboxedValueVector batch; + NKikimr::NMiniKQL::TUnboxedValueBatch batch; bool finished = false; const i64 space = source.Source->GetAsyncInputData(batch, watermark, finished, freeSpace); const ui64 index = inputIndex; @@ -706,7 +706,7 @@ private: void SinkSend( ui64 index, - NKikimr::NMiniKQL::TUnboxedValueVector&& batch, + NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, TMaybe<NDqProto::TCheckpoint>&& checkpoint, i64 size, i64 checkpointSize, diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp index 655e31c44fb..5ce5c8ab5c6 100644 --- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp +++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp @@ -369,18 +369,15 @@ public: auto guard = Runner->BindAllocator(0); // Explicitly reset memory limit NDq::TDqDataSerializer dataSerializer(Runner->GetTypeEnv(), Runner->GetHolderFactory(), NDqProto::EDataTransportVersion::DATA_TRANSPORT_VERSION_UNSPECIFIED); - NKikimr::NMiniKQL::TUnboxedValueVector buffer; - buffer.reserve(request.GetData().GetRows()); + NKikimr::NMiniKQL::TUnboxedValueBatch buffer(source->GetInputType()); if (request.GetString().empty() && request.GetChunks() == 0) { dataSerializer.Deserialize(request.GetData(), source->GetInputType(), buffer); } else if (!request.GetString().empty()) { - buffer.reserve(request.GetString().size()); for (auto& row : request.GetString()) { buffer.emplace_back(NKikimr::NMiniKQL::MakeString(row)); } } else { i64 chunks = request.GetChunks(); - buffer.reserve(chunks); for (i64 i = 0; i < chunks; i++) { NDqProto::TSourcePushChunk chunk; chunk.Load(&input); @@ -634,25 +631,23 @@ public: request.Load(&input); auto guard = Runner->BindAllocator(0); // Explicitly reset memory limit - NKikimr::NMiniKQL::TUnboxedValueVector batch; auto sink = Runner->GetSink(channelId); auto* outputType = sink->GetOutputType(); + NKikimr::NMiniKQL::TUnboxedValueBatch batch(outputType); + YQL_ENSURE(!batch.IsWide()); auto bytes = sink->Pop(batch, request.GetBytes()); NDqProto::TSinkPopResponse response; if (request.GetRaw()) { - for (auto& raw : batch) { - *response.AddString() = raw.AsStringRef(); - } + batch.ForEachRow([&response](const auto& value) { + *response.AddString() = value.AsStringRef(); + }); } else { NDq::TDqDataSerializer dataSerializer( Runner->GetTypeEnv(), Runner->GetHolderFactory(), NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0); - - *response.MutableData() = dataSerializer.Serialize( - batch.begin(), batch.end(), - static_cast<NKikimr::NMiniKQL::TType*>(outputType)); + *response.MutableData() = dataSerializer.Serialize(batch, outputType); } response.SetBytes(bytes); response.Save(&output); diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp index 722ce32ddbb..21294f30735 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp @@ -586,7 +586,7 @@ public: } [[nodiscard]] - bool Pop(NKikimr::NMiniKQL::TUnboxedValueVector& batch) override { + bool Pop(NKikimr::NMiniKQL::TUnboxedValueBatch& batch) override { Y_UNUSED(batch); ythrow yexception() << "unimplemented"; } @@ -726,11 +726,11 @@ public: } } - void Push(NKikimr::NMiniKQL::TUnboxedValueVector&& batch, i64 space) override { + void Push(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space) override { auto inputType = GetInputType(); NDqProto::TSourcePushRequest data; TDqDataSerializer dataSerializer(TaskRunner->GetTypeEnv(), TaskRunner->GetHolderFactory(), NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0); - *data.MutableData() = dataSerializer.Serialize(batch.begin(), batch.end(), static_cast<NKikimr::NMiniKQL::TType*>(inputType)); + *data.MutableData() = dataSerializer.Serialize(batch, inputType); data.SetSpace(space); NDqProto::TCommandHeader header; @@ -740,10 +740,11 @@ public: header.SetChannelId(InputIndex); header.Save(&Output); data.Save(&Output); + } [[nodiscard]] - bool Pop(NKikimr::NMiniKQL::TUnboxedValueVector& batch) override { + bool Pop(NKikimr::NMiniKQL::TUnboxedValueBatch& batch) override { Y_UNUSED(batch); ythrow yexception() << "unimplemented"; } @@ -912,6 +913,12 @@ public: ythrow yexception() << "unimplemented"; } + void WidePush(NUdf::TUnboxedValue* values, ui32 count) override { + Y_UNUSED(values); + Y_UNUSED(count); + ythrow yexception() << "unimplemented"; + } + void Push(NDqProto::TWatermark&& watermark) override { Y_UNUSED(watermark); ythrow yexception() << "unimplemented"; @@ -1087,7 +1094,7 @@ public: } } - ui64 Pop(NKikimr::NMiniKQL::TUnboxedValueVector& batch, ui64 bytes) override { + ui64 Pop(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, ui64 bytes) override { try { NDqProto::TCommandHeader header; header.SetVersion(5); @@ -1201,6 +1208,12 @@ public: Y_FAIL("Unimplemented"); } + void WidePush(NUdf::TUnboxedValue* values, ui32 count) override { + Y_UNUSED(values); + Y_UNUSED(count); + Y_FAIL("Unimplemented"); + } + void Push(NDqProto::TWatermark&& watermark) override { Y_UNUSED(watermark); Y_FAIL("Watermarks are not supported"); diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp index 1cac519e70b..6fb14025eba 100644 --- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp +++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp @@ -247,7 +247,7 @@ private: void AsyncInputPush( ui64 cookie, ui64 index, - NKikimr::NMiniKQL::TUnboxedValueVector&& batch, + NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space, bool finish) override { @@ -255,9 +255,10 @@ private: auto selfId = SelfId(); TVector<TString> strings; - for (auto& row : batch) { - strings.emplace_back(row.AsStringRef()); - } + YQL_ENSURE(!batch.IsWide()); + batch.ForEachRow([&strings](const auto& value) { + strings.emplace_back(value.AsStringRef()); + }); Invoker->Invoke([strings=std::move(strings),taskRunner=TaskRunner, actorSystem, selfId, cookie, parentId=ParentId, space, finish, index, settings=Settings, stageId=StageId]() mutable { try { @@ -363,7 +364,7 @@ private: void OnSinkPopFinished(TEvSinkPopFinished::TPtr& ev) { auto guard = TaskRunner->BindAllocator(); - NKikimr::NMiniKQL::TUnboxedValueVector batch; + NKikimr::NMiniKQL::TUnboxedValueBatch batch; for (auto& row: ev->Get()->Strings) { batch.emplace_back(NKikimr::NMiniKQL::MakeString(row)); } diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp index d97028c5ccf..8e49d47c742 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp @@ -126,8 +126,9 @@ namespace NYql::NDq { hFunc(TEvPrivate::TEvReadResult, Handle); hFunc(TEvPrivate::TEvReadError, Handle);) - i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, TMaybe<TInstant>&, bool& finished, + i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, TMaybe<TInstant>&, bool& finished, i64 /*freeSpace*/) final { + YQL_ENSURE(!buffer.IsWide(), "Wide stream is not supported"); if (Result) { NUdf::TUnboxedValue value; diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp index 55dbf335d1b..e415a2f75c8 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp @@ -277,7 +277,7 @@ private: } } - i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, TMaybe<TInstant>& watermark, bool&, i64 freeSpace) override { + i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, TMaybe<TInstant>& watermark, bool&, i64 freeSpace) override { SRC_LOG_T("GetAsyncInputData freeSpace = " << freeSpace); const auto now = TInstant::Now(); @@ -392,14 +392,15 @@ private: THashMap<NYdb::NPersQueue::TPartitionStream::TPtr, TList<std::pair<ui64, ui64>>> OffsetRanges; // [start, end) }; - bool MaybeReturnReadyBatch(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, TMaybe<TInstant>& watermark, i64& usedSpace) { + bool MaybeReturnReadyBatch(NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, TMaybe<TInstant>& watermark, i64& usedSpace) { if (ReadyBuffer.empty()) { SubscribeOnNextEvent(); return false; } auto& readyBatch = ReadyBuffer.front(); - buffer.swap(readyBatch.Data); + buffer.clear(); + std::move(readyBatch.Data.begin(), readyBatch.Data.end(), std::back_inserter(buffer)); watermark = readyBatch.Watermark; usedSpace = readyBatch.UsedSpace; @@ -419,7 +420,7 @@ private: } SRC_LOG_T("Return ready batch." - << " DataCount = " << buffer.size() + << " DataCount = " << buffer.RowCount() << " Watermark = " << (watermark ? ToString(*watermark) : "none") << " Used space = " << usedSpace); return true; diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp index 434e37e6780..c9d763b8581 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp @@ -122,12 +122,12 @@ public: public: void SendData( - NKikimr::NMiniKQL::TUnboxedValueVector&& batch, + NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 dataSize, const TMaybe<NDqProto::TCheckpoint>& checkpoint, bool finished) override { - SINK_LOG_T("SendData. Batch: " << batch.size() + SINK_LOG_T("SendData. Batch: " << batch.RowCount() << ". Checkpoint: " << checkpoint.Defined() << ". Finished: " << finished); Y_UNUSED(dataSize); @@ -138,17 +138,18 @@ public: CreateSessionIfNotExists(); - for (const NUdf::TUnboxedValue& item : batch) { - if (!item.IsBoxed()) { + Y_VERIFY(!batch.IsWide(), "Wide batch is not supported"); + if (!batch.ForEachRow([&](const auto& value) { + if (!value.IsBoxed()) { Fail("Struct with single field was expected"); - return; + return false; } - const NUdf::TUnboxedValue dataCol = item.GetElement(0); + const NUdf::TUnboxedValue dataCol = value.GetElement(0); if (!dataCol.IsString() && !dataCol.IsEmbedded()) { Fail(TStringBuilder() << "Non string value could not be written to YDS stream"); - return; + return false; } TString data(dataCol.AsStringRef()); @@ -160,11 +161,14 @@ public: if (messageSize > MaxMessageSize) { Fail(TStringBuilder() << "Max message size for YDS is " << MaxMessageSize << " bytes but received message with size of " << messageSize << " bytes"); - return; + return false; } FreeSpace -= messageSize; Buffer.push(std::move(data)); + return true; + })) { + return; } if (checkpoint) { diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 48a24c7091f..2ba03ea7d06 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -838,10 +838,9 @@ private: } } - i64 GetAsyncInputData(TUnboxedValueVector& buffer, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final { + i64 GetAsyncInputData(TUnboxedValueBatch& buffer, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final { i64 total = 0LL; if (!Blocks.empty()) { - buffer.reserve(buffer.size() + Blocks.size()); do { auto& content = std::get<IHTTPGateway::TContent>(Blocks.front()); const auto size = content.size(); @@ -2462,7 +2461,7 @@ private: LOG_D("TS3StreamReadActor", "Memory usage. Ready blocks: " << Blocks.size() << ". Ready blocks total size: " << blocksTotalSize); } - i64 GetAsyncInputData(TUnboxedValueVector& output, TMaybe<TInstant>&, bool& finished, i64 free) final { + i64 GetAsyncInputData(TUnboxedValueBatch& output, TMaybe<TInstant>&, bool& finished, i64 free) final { ReportMemoryUsage(); i64 total = 0LL; diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index d953464688c..0493626a1ae 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -498,10 +498,11 @@ private: hFunc(TEvPrivate::TEvUploadFinished, Handle); ) - void SendData(TUnboxedValueVector&& data, i64, const TMaybe<NDqProto::TCheckpoint>&, bool finished) final { + void SendData(TUnboxedValueBatch&& data, i64, const TMaybe<NDqProto::TCheckpoint>&, bool finished) final { std::unordered_set<TS3FileWriteActor*> processedActors; - for (const auto& v : data) { - const auto& key = MakePartitionKey(v); + YQL_ENSURE(!data.IsWide(), "Wide stream is not supported yet"); + data.ForEachRow([&](const auto& row) { + const auto& key = MakePartitionKey(row); const auto [keyIt, insertedNew] = FileWriteActors.emplace(key, std::vector<TS3FileWriteActor*>()); if (insertedNew || keyIt->second.empty() || keyIt->second.back()->IsFinishing()) { auto fileWrite = std::make_unique<TS3FileWriteActor>( @@ -516,7 +517,7 @@ private: RegisterWithSameMailbox(fileWrite.release()); } - const NUdf::TUnboxedValue& value = Keys.empty() ? v : *v.GetElements(); + const NUdf::TUnboxedValue& value = Keys.empty() ? row : *row.GetElements(); TS3FileWriteActor* actor = keyIt->second.back(); if (value) { actor->AddData(TString(value.AsStringRef())); @@ -525,7 +526,7 @@ private: actor->Seal(); } processedActors.insert(actor); - } + }); for (TS3FileWriteActor* actor : processedActors) { actor->Go(); diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp index c6c216fb083..d59e49f748a 100644 --- a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp +++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp @@ -151,12 +151,13 @@ public: public: void SendData( - NKikimr::NMiniKQL::TUnboxedValueVector&& batch, + NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64, const TMaybe<NDqProto::TCheckpoint>& checkpoint, bool finished) override { - SINK_LOG_T("Got " << batch.size() << " items to send. Checkpoint: " << checkpoint.Defined() + YQL_ENSURE(!batch.IsWide(), "Wide streams are not supported"); + SINK_LOG_T("Got " << batch.RowCount() << " items to send. Checkpoint: " << checkpoint.Defined() << ". Send queue: " << SendingBuffer.size() << ". Inflight: " << InflightBuffer.size() << ". Checkpoint in progress: " << CheckpointInProgress.has_value()); @@ -165,13 +166,13 @@ public: } ui64 metricsCount = 0; - for (const auto& item : batch) { + batch.ForEachRow([&](const auto& value) { if (metricsCount + WriteParams.Shard.GetScheme().GetSensors().size() > MaxMetricsPerRequest) { PushMetricsToBuffer(metricsCount); } - metricsCount += UserMetricsEncoder.Append(item); - } + metricsCount += UserMetricsEncoder.Append(value); + }); if (metricsCount != 0) { PushMetricsToBuffer(metricsCount); diff --git a/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.cpp b/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.cpp index 48c4bebd2f9..745d4a4a41d 100644 --- a/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.cpp +++ b/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.cpp @@ -126,10 +126,9 @@ private: TActorBootstrapped<TYdbReadActor>::PassAway(); } - i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final { + i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final { i64 total = 0LL; if (!Blocks.empty()) { - buffer.reserve(buffer.size() + Blocks.size()); do { const auto size = Blocks.front().size(); buffer.emplace_back(NKikimr::NMiniKQL::MakeString(Blocks.front())); |