diff options
author | ssmike <ssmike@ydb.tech> | 2023-03-15 14:50:51 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2023-03-15 14:50:51 +0300 |
commit | 60305e34097f8abe3be77c5d7f04a4f0ecf98844 (patch) | |
tree | 242b6ff1a6e4c4cf46c3e5eefdef4ff019844566 | |
parent | f0241f6a7eaadf213742c401c1aed515e29f6f63 (diff) | |
download | ydb-60305e34097f8abe3be77c5d7f04a4f0ecf98844.tar.gz |
Account only logically consumed rows
Это быстрофикс проблемы с биллингом.
По уму надо либо биллинг менять, либо делать отдельный callable для подсчета строк в yql
12 files changed, 142 insertions, 26 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 8ab19ba2e9..22cef80a53 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -978,6 +978,10 @@ public: YQL_ENSURE(packed == 0); if (Settings.ColumnsSize() == 0) { batch->resize(result->Get()->GetRowsCount(), HolderFactory.GetEmptyContainer()); + for (ui64 rowIndex = 0; rowIndex < result->Get()->GetRowsCount(); ++rowIndex) { + // min row size according to datashard + RowsSizeCollector.Add(8); + } } else { TVector<NUdf::TUnboxedValue*> editAccessors(result->Get()->GetRowsCount()); batch->reserve(result->Get()->GetRowsCount()); @@ -1006,6 +1010,10 @@ public: columnIndex += 1; } } + + for (ui64 rowIndex = 0; rowIndex < result->Get()->GetRowsCount(); ++rowIndex) { + RowsSizeCollector.Add(GetRowSize((*batch)[rowIndex].GetElements()).DataBytes); + } } if (!hasResultColumns) { @@ -1052,6 +1060,7 @@ public: columnIndex += 1; } } + // min row size according to datashard rowSize = std::max(rowSize, (i64)8); columnIndex = 0; @@ -1066,6 +1075,7 @@ public: } } + RowsSizeCollector.Add(rowSize); stats.DataBytes += rowSize; stats.AllocatedBytes += GetRowSize(rowItems).AllocatedBytes; freeSpace -= rowSize; @@ -1195,7 +1205,7 @@ public: return bytes; } - void FillExtraStats(NDqProto::TDqTaskStats* stats, bool last) override { + void FillExtraStats(NDqProto::TDqTaskStats* stats, bool last, const NYql::NDq::TDqBillingStats* billing) override { if (last) { stats->SetErrorsCount(ErrorsCount); @@ -1212,10 +1222,17 @@ public: } - //FIXME: use evread statistics after KIKIMR-16924 - tableStats->SetReadRows(tableStats->GetReadRows() + ReceivedRowCount); - tableStats->SetReadBytes(tableStats->GetReadBytes() + BytesStats.DataBytes); + auto consumedRows = billing ? billing->Inputs[InputIndex]->RowsConsumed : ReceivedRowCount; + + //FIXME: use real rows count + tableStats->SetReadRows(tableStats->GetReadRows() + consumedRows); + tableStats->SetReadBytes(tableStats->GetReadBytes() + RowsSizeCollector.RowsSize(consumedRows)); tableStats->SetAffectedPartitions(tableStats->GetAffectedPartitions() + InFlightShards.Size()); + + //FIXME: use evread statistics after KIKIMR-16924 + //tableStats->SetReadRows(tableStats->GetReadRows() + ReceivedRowCount); + //tableStats->SetReadBytes(tableStats->GetReadBytes() + BytesStats.DataBytes); + //tableStats->SetAffectedPartitions(tableStats->GetAffectedPartitions() + InFlightShards.Size()); } } @@ -1284,6 +1301,24 @@ public: } private: + struct TRowSizeCollector { + void Add(ui64 sz) { + Prev = Sizes.emplace_back(Prev + sz); + } + + ui64 RowsSize(ui64 processed) { + if (processed > 0) { + return Sizes[processed - 1]; + } else { + return 0; + } + } + + ui64 Prev = 0; + std::vector<ui64> Sizes; + } RowsSizeCollector; + + NKikimrTxDataShard::TKqpReadRangesSourceSettings Settings; TVector<NScheme::TTypeInfo> KeyColumnTypes; diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index a8edf455bd..216684aa4d 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -86,7 +86,7 @@ public: return NKikimrServices::TActivity::KQP_STREAM_LOOKUP_ACTOR; } - void FillExtraStats(NYql::NDqProto::TDqTaskStats* stats , bool last) override { + void FillExtraStats(NYql::NDqProto::TDqTaskStats* stats , bool last, const NYql::NDq::TDqBillingStats*) override { if (last) { stats->SetErrorsCount(ErrorsCount); diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp index 12b95f11d9..ddabd482c8 100644 --- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp @@ -2881,9 +2881,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { } Y_UNIT_TEST(DeleteWithInputMultiConsumptionLimit) { - NKikimrConfig::TAppConfig app; - app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); - TKikimrRunner kikimr(TKikimrSettings().SetAppConfig(app)); + TKikimrRunner kikimr; auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h index b147b070e6..069121c715 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h @@ -2,6 +2,8 @@ #include <ydb/library/yql/dq/actors/dq_events_ids.h> #include <ydb/library/yql/dq/common/dq_common.h> #include <ydb/library/yql/dq/runtime/dq_output_consumer.h> +#include <ydb/library/yql/dq/runtime/dq_async_input.h> +#include <ydb/library/yql/dq/runtime/dq_input_producer.h> #include <ydb/library/yql/dq/runtime/dq_async_output.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> #include <ydb/library/yql/public/issue/yql_issue.h> @@ -93,7 +95,7 @@ struct IDqComputeActorAsyncInput { virtual TMaybe<google::protobuf::Any> ExtraData() { return {}; } - virtual void FillExtraStats(NDqProto::TDqTaskStats* /* stats */, bool /* finalized stats */) { } + virtual void FillExtraStats(NDqProto::TDqTaskStats* /* stats */, bool /* finalized stats */, const NYql::NDq::TDqBillingStats*) { } // The same signature as IActor::PassAway(). // It is guaranted that this method will be called with bound MKQL allocator. diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 5281bcd3b2..be26135faa 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1939,7 +1939,8 @@ public: auto ingressBytes = sourceInfo.AsyncInput->GetIngressBytes(); ingressBytesMap.emplace(inputIndex, ingressBytes); Ingress[sourceInfo.Type] = Ingress.Value(sourceInfo.Type, 0) + ingressBytes; - source->FillExtraStats(protoTask, last); + // TODO: support async CA + source->FillExtraStats(protoTask, last, TaskRunner ? TaskRunner->GetBillingStats() : nullptr); } } FillTaskRunnerStats(Task.GetId(), Task.GetStageId(), *taskStats, protoTask, (bool) GetProfileStats(), ingressBytesMap); @@ -1993,7 +1994,8 @@ public: } if (auto* transform = transformInfo.AsyncInput) { - transform->FillExtraStats(protoTask, last); + // TODO: support async CA + transform->FillExtraStats(protoTask, last, TaskRunner ? TaskRunner->GetBillingStats() : 0); } } diff --git a/ydb/library/yql/dq/runtime/dq_input_producer.cpp b/ydb/library/yql/dq/runtime/dq_input_producer.cpp index 71f64e30a5..fdd10d09c2 100644 --- a/ydb/library/yql/dq/runtime/dq_input_producer.cpp +++ b/ydb/library/yql/dq/runtime/dq_input_producer.cpp @@ -14,10 +14,12 @@ namespace { class TDqInputUnionStreamValue : public TComputationValue<TDqInputUnionStreamValue> { public: - TDqInputUnionStreamValue(TMemoryUsageInfo* memInfo, TVector<IDqInput::TPtr>&& inputs) + TDqInputUnionStreamValue(TMemoryUsageInfo* memInfo, TVector<IDqInput::TPtr>&& inputs, TDqBillingStats::TInputStats* stats) : TComputationValue<TDqInputUnionStreamValue>(memInfo) , Inputs(std::move(inputs)) - , CurrentItemIndex(0) {} + , CurrentItemIndex(0) + , Stats(stats) + {} private: NUdf::EFetchStatus Fetch(NKikimr::NUdf::TUnboxedValue& result) final { @@ -33,6 +35,9 @@ private: } result = std::move(CurrentBuffer[CurrentItemIndex]); + if (Stats) { + Stats->RowsConsumed += 1; + } ++CurrentItemIndex; return NUdf::EFetchStatus::Ok; } @@ -56,15 +61,17 @@ private: TVector<IDqInput::TPtr> Inputs; TUnboxedValueVector CurrentBuffer; ui64 CurrentItemIndex; + TDqBillingStats::TInputStats* Stats; }; class TDqInputMergeStreamValue : public TComputationValue<TDqInputMergeStreamValue> { public: TDqInputMergeStreamValue(TMemoryUsageInfo* memInfo, TVector<IDqInput::TPtr>&& inputs, - TVector<TSortColumnInfo>&& sortCols) + TVector<TSortColumnInfo>&& sortCols, TDqBillingStats::TInputStats* stats) : TComputationValue<TDqInputMergeStreamValue>(memInfo) , Inputs(std::move(inputs)) , SortCols(std::move(sortCols)) + , Stats(stats) { CurrentBuffers.resize(Inputs.size()); CurrentItemIndexes.reserve(Inputs.size()); @@ -136,6 +143,9 @@ private: return status; } + if (Stats) { + Stats->RowsConsumed += 1; + } result = std::move(FindResult()); return NUdf::EFetchStatus::Ok; } @@ -211,20 +221,21 @@ private: TVector<TUnboxedValuesIterator> CurrentItemIndexes; ui32 InitializationIndex = 0; TMap<ui32, EDataSlot> SortColTypes; + TDqBillingStats::TInputStats* Stats; }; } // namespace NUdf::TUnboxedValue CreateInputUnionValue(TVector<IDqInput::TPtr>&& inputs, - const NMiniKQL::THolderFactory& factory) + const NMiniKQL::THolderFactory& factory, TDqBillingStats::TInputStats* stats) { - return factory.Create<TDqInputUnionStreamValue>(std::move(inputs)); + return factory.Create<TDqInputUnionStreamValue>(std::move(inputs), stats); } NKikimr::NUdf::TUnboxedValue CreateInputMergeValue(TVector<IDqInput::TPtr>&& inputs, - TVector<TSortColumnInfo>&& sortCols, const NKikimr::NMiniKQL::THolderFactory& factory) + TVector<TSortColumnInfo>&& sortCols, const NKikimr::NMiniKQL::THolderFactory& factory, TDqBillingStats::TInputStats* stats) { - return factory.Create<TDqInputMergeStreamValue>(std::move(inputs), std::move(sortCols)); + return factory.Create<TDqInputMergeStreamValue>(std::move(inputs), std::move(sortCols), stats); } } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/runtime/dq_input_producer.h b/ydb/library/yql/dq/runtime/dq_input_producer.h index 7a590b23be..bc34c0f0a7 100644 --- a/ydb/library/yql/dq/runtime/dq_input_producer.h +++ b/ydb/library/yql/dq/runtime/dq_input_producer.h @@ -5,10 +5,22 @@ namespace NYql::NDq { +struct TDqBillingStats { + struct TInputStats { + ui64 RowsConsumed; + }; + std::vector<std::unique_ptr<TInputStats>> Inputs; + + TInputStats& AddInputs() { + Inputs.push_back(std::make_unique<TInputStats>()); + return *Inputs.back(); + } +}; + NKikimr::NUdf::TUnboxedValue CreateInputUnionValue(TVector<IDqInput::TPtr>&& inputs, - const NKikimr::NMiniKQL::THolderFactory& holderFactory); + const NKikimr::NMiniKQL::THolderFactory& holderFactory, TDqBillingStats::TInputStats*); NKikimr::NUdf::TUnboxedValue CreateInputMergeValue(TVector<IDqInput::TPtr>&& inputs, - TVector<TSortColumnInfo>&& sortCols, const NKikimr::NMiniKQL::THolderFactory& factory); + TVector<TSortColumnInfo>&& sortCols, const NKikimr::NMiniKQL::THolderFactory& factory, TDqBillingStats::TInputStats*); } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index c00ee72aa5..088daf74a8 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -139,21 +139,21 @@ void ValidateParamValue(std::string_view paramName, const TType* type, const NUd #define LOG(...) do { if (Y_UNLIKELY(LogFunc)) { LogFunc(__VA_ARGS__); } } while (0) NUdf::TUnboxedValue DqBuildInputValue(const NDqProto::TTaskInput& inputDesc, const NKikimr::NMiniKQL::TType* type, - TVector<IDqInput::TPtr>&& inputs, const THolderFactory& holderFactory) + TVector<IDqInput::TPtr>&& inputs, const THolderFactory& holderFactory, TDqBillingStats::TInputStats* stats) { switch (inputDesc.GetTypeCase()) { case NYql::NDqProto::TTaskInput::kSource: Y_VERIFY(inputs.size() == 1); [[fallthrough]]; case NYql::NDqProto::TTaskInput::kUnionAll: - return CreateInputUnionValue(std::move(inputs), holderFactory); + return CreateInputUnionValue(std::move(inputs), holderFactory, stats); case NYql::NDqProto::TTaskInput::kMerge: { const auto& protoSortCols = inputDesc.GetMerge().GetSortColumns(); TVector<TSortColumnInfo> sortColsInfo; GetColumnsInfo(type, protoSortCols, sortColsInfo); YQL_ENSURE(!sortColsInfo.empty()); - return CreateInputMergeValue(std::move(inputs), std::move(sortColsInfo), holderFactory); + return CreateInputMergeValue(std::move(inputs), std::move(sortColsInfo), holderFactory, stats); } default: YQL_ENSURE(false, "Unknown input type: " << (ui32) inputDesc.GetTypeCase()); @@ -266,6 +266,10 @@ public: } } + const TDqBillingStats* GetBillingStats() const override { + return &BillingStats; + } + ui64 GetTaskId() const override { Y_VERIFY(TaskId, "Not prepared yet"); return TaskId; @@ -515,6 +519,7 @@ public: for (ui32 i = 0; i < task.InputsSize(); ++i) { auto& inputDesc = task.GetInputs(i); + auto& inputStats = BillingStats.AddInputs(); TVector<IDqInput::TPtr> inputs{Reserve(std::max<ui64>(inputDesc.ChannelsSize(), 1))}; // 1 is for "source" type of input. TInputTransformInfo* transform = nullptr; @@ -572,14 +577,14 @@ public: auto entryNode = AllocatedHolder->ProgramParsed.CompGraph->GetEntryPoint(i, true); if (transform) { - transform->TransformInput = DqBuildInputValue(inputDesc, transform->TransformInputType, std::move(inputs), holderFactory); + transform->TransformInput = DqBuildInputValue(inputDesc, transform->TransformInputType, std::move(inputs), holderFactory, nullptr); inputs.clear(); inputs.emplace_back(transform->TransformOutput); entryNode->SetValue(AllocatedHolder->ProgramParsed.CompGraph->GetContext(), - CreateInputUnionValue(std::move(inputs), holderFactory)); + CreateInputUnionValue(std::move(inputs), holderFactory, &inputStats)); } else { entryNode->SetValue(AllocatedHolder->ProgramParsed.CompGraph->GetContext(), - DqBuildInputValue(inputDesc, entry->InputItemTypes[i], std::move(inputs), holderFactory)); + DqBuildInputValue(inputDesc, entry->InputItemTypes[i], std::move(inputs), holderFactory, &inputStats)); } } @@ -978,6 +983,7 @@ private: bool CollectBasicStats = false; bool CollectProfileStats = false; std::unique_ptr<TDqTaskRunnerStats> Stats; + TDqBillingStats BillingStats; TDuration RunComputeTime; private: diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h index 8ce147c3dd..1c0a0a9043 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h @@ -5,6 +5,7 @@ #include <ydb/library/yql/dq/runtime/dq_async_output.h> #include <ydb/library/yql/dq/runtime/dq_compute.h> #include <ydb/library/yql/dq/runtime/dq_input_channel.h> +#include <ydb/library/yql/dq/runtime/dq_input_producer.h> #include <ydb/library/yql/dq/runtime/dq_output_channel.h> #include <ydb/library/yql/dq/runtime/dq_output_consumer.h> #include <ydb/library/yql/dq/runtime/dq_async_input.h> @@ -319,6 +320,7 @@ public: virtual void UpdateStats() = 0; virtual const TDqTaskRunnerStats* GetStats() const = 0; + virtual const TDqBillingStats* GetBillingStats() const = 0; [[nodiscard]] virtual TString Save() const = 0; diff --git a/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto b/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto index 62e029b30c..c91962581c 100644 --- a/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto +++ b/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto @@ -43,6 +43,8 @@ message TCommandHeader { SINK_IS_FINISHED = 24; // Header -> TIsFinishedResponse SINK_OUTPUT_TYPE = 25; // Header -> TGetTypeResponse SINK_STATS = 26; // Header -> TGetSinkStatsResponse + + GET_BILLING_STATS = 27; }; int32 Version = 1; @@ -66,6 +68,13 @@ message TPopResponse { google.protobuf.Any Stats = 4; } +message TBillingStatsResponse { + message TInputStats { + uint64 RowsConsumed = 1; + }; + repeated TInputStats Inputs = 1; +}; + message TSinkPopRequest { uint64 Bytes = 1; bool Raw = 2; diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp index 11314e9fe1..848cb287c6 100644 --- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp +++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp @@ -244,6 +244,16 @@ public: } _exit(127); } + + NDqProto::TBillingStatsResponse GetBillingStats() { + NDqProto::TBillingStatsResponse resp; + auto* stats = Runner->GetBillingStats(); + for (auto& input : stats->Inputs) { + auto* i = resp.AddInputs(); + i->SetRowsConsumed(input->RowsConsumed); + } + return resp; + } NDqProto::TGetStatsResponse GetStats(ui64 taskId) { const auto stats = Runner->GetStats(); @@ -585,6 +595,12 @@ public: GetStats(taskId).Save(&output); break; } + case NDqProto::TCommandHeader::GET_BILLING_STATS: { + Y_ENSURE(header.GetVersion() >= 3); + Y_ENSURE(taskId == Runner->GetTaskId()); + GetBillingStats().Save(&output); + break; + } case NDqProto::TCommandHeader::GET_STATS_INPUT: { Y_ENSURE(header.GetVersion() >= 3); Y_ENSURE(taskId == Runner->GetTaskId()); diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp index f8efe2d3d3..e37ce4f05d 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp @@ -1607,6 +1607,28 @@ public: void UpdateStats() override { } + const TDqBillingStats* GetBillingStats() const override { + try { + NDqProto::TCommandHeader header; + header.SetVersion(3); + header.SetCommand(NDqProto::TCommandHeader::GET_BILLING_STATS); + header.SetTaskId(Task.GetId()); + header.Save(&Delegate->GetOutput()); + + NDqProto::TBillingStatsResponse response; + response.Load(&Delegate->GetInput()); + + BillingStats.Inputs.clear(); + for (auto input : response.GetInputs()) { + auto i = BillingStats.AddInputs(); + i.RowsConsumed = input.GetRowsConsumed(); + } + return &BillingStats; + } catch (...) { + Delegate->RaiseException(); + } + } + const TDqTaskRunnerStats* GetStats() const override { try { @@ -1648,6 +1670,7 @@ private: TIntrusivePtr<TTaskRunner> Delegate; NDqProto::TDqTask Task; mutable TDqTaskRunnerStats Stats; + mutable TDqBillingStats BillingStats; THashMap<ui64, IDqInputChannel::TPtr> InputChannels; THashMap<ui64, IDqAsyncInputBuffer::TPtr> Sources; |