aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2023-03-15 14:50:51 +0300
committerssmike <ssmike@ydb.tech>2023-03-15 14:50:51 +0300
commit60305e34097f8abe3be77c5d7f04a4f0ecf98844 (patch)
tree242b6ff1a6e4c4cf46c3e5eefdef4ff019844566
parentf0241f6a7eaadf213742c401c1aed515e29f6f63 (diff)
downloadydb-60305e34097f8abe3be77c5d7f04a4f0ecf98844.tar.gz
Account only logically consumed rows
Это быстрофикс проблемы с биллингом. По уму надо либо биллинг менять, либо делать отдельный callable для подсчета строк в yql
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp43
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp2
-rw-r--r--ydb/core/kqp/ut/opt/kqp_ne_ut.cpp4
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h4
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h6
-rw-r--r--ydb/library/yql/dq/runtime/dq_input_producer.cpp25
-rw-r--r--ydb/library/yql/dq/runtime/dq_input_producer.h16
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp18
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.h2
-rw-r--r--ydb/library/yql/providers/dq/api/protos/task_command_executor.proto9
-rw-r--r--ydb/library/yql/providers/dq/runtime/task_command_executor.cpp16
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp23
12 files changed, 142 insertions, 26 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index 8ab19ba2e9..22cef80a53 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -978,6 +978,10 @@ public:
YQL_ENSURE(packed == 0);
if (Settings.ColumnsSize() == 0) {
batch->resize(result->Get()->GetRowsCount(), HolderFactory.GetEmptyContainer());
+ for (ui64 rowIndex = 0; rowIndex < result->Get()->GetRowsCount(); ++rowIndex) {
+ // min row size according to datashard
+ RowsSizeCollector.Add(8);
+ }
} else {
TVector<NUdf::TUnboxedValue*> editAccessors(result->Get()->GetRowsCount());
batch->reserve(result->Get()->GetRowsCount());
@@ -1006,6 +1010,10 @@ public:
columnIndex += 1;
}
}
+
+ for (ui64 rowIndex = 0; rowIndex < result->Get()->GetRowsCount(); ++rowIndex) {
+ RowsSizeCollector.Add(GetRowSize((*batch)[rowIndex].GetElements()).DataBytes);
+ }
}
if (!hasResultColumns) {
@@ -1052,6 +1060,7 @@ public:
columnIndex += 1;
}
}
+ // min row size according to datashard
rowSize = std::max(rowSize, (i64)8);
columnIndex = 0;
@@ -1066,6 +1075,7 @@ public:
}
}
+ RowsSizeCollector.Add(rowSize);
stats.DataBytes += rowSize;
stats.AllocatedBytes += GetRowSize(rowItems).AllocatedBytes;
freeSpace -= rowSize;
@@ -1195,7 +1205,7 @@ public:
return bytes;
}
- void FillExtraStats(NDqProto::TDqTaskStats* stats, bool last) override {
+ void FillExtraStats(NDqProto::TDqTaskStats* stats, bool last, const NYql::NDq::TDqBillingStats* billing) override {
if (last) {
stats->SetErrorsCount(ErrorsCount);
@@ -1212,10 +1222,17 @@ public:
}
- //FIXME: use evread statistics after KIKIMR-16924
- tableStats->SetReadRows(tableStats->GetReadRows() + ReceivedRowCount);
- tableStats->SetReadBytes(tableStats->GetReadBytes() + BytesStats.DataBytes);
+ auto consumedRows = billing ? billing->Inputs[InputIndex]->RowsConsumed : ReceivedRowCount;
+
+ //FIXME: use real rows count
+ tableStats->SetReadRows(tableStats->GetReadRows() + consumedRows);
+ tableStats->SetReadBytes(tableStats->GetReadBytes() + RowsSizeCollector.RowsSize(consumedRows));
tableStats->SetAffectedPartitions(tableStats->GetAffectedPartitions() + InFlightShards.Size());
+
+ //FIXME: use evread statistics after KIKIMR-16924
+ //tableStats->SetReadRows(tableStats->GetReadRows() + ReceivedRowCount);
+ //tableStats->SetReadBytes(tableStats->GetReadBytes() + BytesStats.DataBytes);
+ //tableStats->SetAffectedPartitions(tableStats->GetAffectedPartitions() + InFlightShards.Size());
}
}
@@ -1284,6 +1301,24 @@ public:
}
private:
+ struct TRowSizeCollector {
+ void Add(ui64 sz) {
+ Prev = Sizes.emplace_back(Prev + sz);
+ }
+
+ ui64 RowsSize(ui64 processed) {
+ if (processed > 0) {
+ return Sizes[processed - 1];
+ } else {
+ return 0;
+ }
+ }
+
+ ui64 Prev = 0;
+ std::vector<ui64> Sizes;
+ } RowsSizeCollector;
+
+
NKikimrTxDataShard::TKqpReadRangesSourceSettings Settings;
TVector<NScheme::TTypeInfo> KeyColumnTypes;
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
index a8edf455bd..216684aa4d 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -86,7 +86,7 @@ public:
return NKikimrServices::TActivity::KQP_STREAM_LOOKUP_ACTOR;
}
- void FillExtraStats(NYql::NDqProto::TDqTaskStats* stats , bool last) override {
+ void FillExtraStats(NYql::NDqProto::TDqTaskStats* stats , bool last, const NYql::NDq::TDqBillingStats*) override {
if (last) {
stats->SetErrorsCount(ErrorsCount);
diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
index 12b95f11d9..ddabd482c8 100644
--- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
@@ -2881,9 +2881,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
}
Y_UNIT_TEST(DeleteWithInputMultiConsumptionLimit) {
- NKikimrConfig::TAppConfig app;
- app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
- TKikimrRunner kikimr(TKikimrSettings().SetAppConfig(app));
+ TKikimrRunner kikimr;
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
index b147b070e6..069121c715 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
@@ -2,6 +2,8 @@
#include <ydb/library/yql/dq/actors/dq_events_ids.h>
#include <ydb/library/yql/dq/common/dq_common.h>
#include <ydb/library/yql/dq/runtime/dq_output_consumer.h>
+#include <ydb/library/yql/dq/runtime/dq_async_input.h>
+#include <ydb/library/yql/dq/runtime/dq_input_producer.h>
#include <ydb/library/yql/dq/runtime/dq_async_output.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
@@ -93,7 +95,7 @@ struct IDqComputeActorAsyncInput {
virtual TMaybe<google::protobuf::Any> ExtraData() { return {}; }
- virtual void FillExtraStats(NDqProto::TDqTaskStats* /* stats */, bool /* finalized stats */) { }
+ virtual void FillExtraStats(NDqProto::TDqTaskStats* /* stats */, bool /* finalized stats */, const NYql::NDq::TDqBillingStats*) { }
// The same signature as IActor::PassAway().
// It is guaranted that this method will be called with bound MKQL allocator.
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 5281bcd3b2..be26135faa 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -1939,7 +1939,8 @@ public:
auto ingressBytes = sourceInfo.AsyncInput->GetIngressBytes();
ingressBytesMap.emplace(inputIndex, ingressBytes);
Ingress[sourceInfo.Type] = Ingress.Value(sourceInfo.Type, 0) + ingressBytes;
- source->FillExtraStats(protoTask, last);
+ // TODO: support async CA
+ source->FillExtraStats(protoTask, last, TaskRunner ? TaskRunner->GetBillingStats() : nullptr);
}
}
FillTaskRunnerStats(Task.GetId(), Task.GetStageId(), *taskStats, protoTask, (bool) GetProfileStats(), ingressBytesMap);
@@ -1993,7 +1994,8 @@ public:
}
if (auto* transform = transformInfo.AsyncInput) {
- transform->FillExtraStats(protoTask, last);
+ // TODO: support async CA
+ transform->FillExtraStats(protoTask, last, TaskRunner ? TaskRunner->GetBillingStats() : 0);
}
}
diff --git a/ydb/library/yql/dq/runtime/dq_input_producer.cpp b/ydb/library/yql/dq/runtime/dq_input_producer.cpp
index 71f64e30a5..fdd10d09c2 100644
--- a/ydb/library/yql/dq/runtime/dq_input_producer.cpp
+++ b/ydb/library/yql/dq/runtime/dq_input_producer.cpp
@@ -14,10 +14,12 @@ namespace {
class TDqInputUnionStreamValue : public TComputationValue<TDqInputUnionStreamValue> {
public:
- TDqInputUnionStreamValue(TMemoryUsageInfo* memInfo, TVector<IDqInput::TPtr>&& inputs)
+ TDqInputUnionStreamValue(TMemoryUsageInfo* memInfo, TVector<IDqInput::TPtr>&& inputs, TDqBillingStats::TInputStats* stats)
: TComputationValue<TDqInputUnionStreamValue>(memInfo)
, Inputs(std::move(inputs))
- , CurrentItemIndex(0) {}
+ , CurrentItemIndex(0)
+ , Stats(stats)
+ {}
private:
NUdf::EFetchStatus Fetch(NKikimr::NUdf::TUnboxedValue& result) final {
@@ -33,6 +35,9 @@ private:
}
result = std::move(CurrentBuffer[CurrentItemIndex]);
+ if (Stats) {
+ Stats->RowsConsumed += 1;
+ }
++CurrentItemIndex;
return NUdf::EFetchStatus::Ok;
}
@@ -56,15 +61,17 @@ private:
TVector<IDqInput::TPtr> Inputs;
TUnboxedValueVector CurrentBuffer;
ui64 CurrentItemIndex;
+ TDqBillingStats::TInputStats* Stats;
};
class TDqInputMergeStreamValue : public TComputationValue<TDqInputMergeStreamValue> {
public:
TDqInputMergeStreamValue(TMemoryUsageInfo* memInfo, TVector<IDqInput::TPtr>&& inputs,
- TVector<TSortColumnInfo>&& sortCols)
+ TVector<TSortColumnInfo>&& sortCols, TDqBillingStats::TInputStats* stats)
: TComputationValue<TDqInputMergeStreamValue>(memInfo)
, Inputs(std::move(inputs))
, SortCols(std::move(sortCols))
+ , Stats(stats)
{
CurrentBuffers.resize(Inputs.size());
CurrentItemIndexes.reserve(Inputs.size());
@@ -136,6 +143,9 @@ private:
return status;
}
+ if (Stats) {
+ Stats->RowsConsumed += 1;
+ }
result = std::move(FindResult());
return NUdf::EFetchStatus::Ok;
}
@@ -211,20 +221,21 @@ private:
TVector<TUnboxedValuesIterator> CurrentItemIndexes;
ui32 InitializationIndex = 0;
TMap<ui32, EDataSlot> SortColTypes;
+ TDqBillingStats::TInputStats* Stats;
};
} // namespace
NUdf::TUnboxedValue CreateInputUnionValue(TVector<IDqInput::TPtr>&& inputs,
- const NMiniKQL::THolderFactory& factory)
+ const NMiniKQL::THolderFactory& factory, TDqBillingStats::TInputStats* stats)
{
- return factory.Create<TDqInputUnionStreamValue>(std::move(inputs));
+ return factory.Create<TDqInputUnionStreamValue>(std::move(inputs), stats);
}
NKikimr::NUdf::TUnboxedValue CreateInputMergeValue(TVector<IDqInput::TPtr>&& inputs,
- TVector<TSortColumnInfo>&& sortCols, const NKikimr::NMiniKQL::THolderFactory& factory)
+ TVector<TSortColumnInfo>&& sortCols, const NKikimr::NMiniKQL::THolderFactory& factory, TDqBillingStats::TInputStats* stats)
{
- return factory.Create<TDqInputMergeStreamValue>(std::move(inputs), std::move(sortCols));
+ return factory.Create<TDqInputMergeStreamValue>(std::move(inputs), std::move(sortCols), stats);
}
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/runtime/dq_input_producer.h b/ydb/library/yql/dq/runtime/dq_input_producer.h
index 7a590b23be..bc34c0f0a7 100644
--- a/ydb/library/yql/dq/runtime/dq_input_producer.h
+++ b/ydb/library/yql/dq/runtime/dq_input_producer.h
@@ -5,10 +5,22 @@
namespace NYql::NDq {
+struct TDqBillingStats {
+ struct TInputStats {
+ ui64 RowsConsumed;
+ };
+ std::vector<std::unique_ptr<TInputStats>> Inputs;
+
+ TInputStats& AddInputs() {
+ Inputs.push_back(std::make_unique<TInputStats>());
+ return *Inputs.back();
+ }
+};
+
NKikimr::NUdf::TUnboxedValue CreateInputUnionValue(TVector<IDqInput::TPtr>&& inputs,
- const NKikimr::NMiniKQL::THolderFactory& holderFactory);
+ const NKikimr::NMiniKQL::THolderFactory& holderFactory, TDqBillingStats::TInputStats*);
NKikimr::NUdf::TUnboxedValue CreateInputMergeValue(TVector<IDqInput::TPtr>&& inputs,
- TVector<TSortColumnInfo>&& sortCols, const NKikimr::NMiniKQL::THolderFactory& factory);
+ TVector<TSortColumnInfo>&& sortCols, const NKikimr::NMiniKQL::THolderFactory& factory, TDqBillingStats::TInputStats*);
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index c00ee72aa5..088daf74a8 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -139,21 +139,21 @@ void ValidateParamValue(std::string_view paramName, const TType* type, const NUd
#define LOG(...) do { if (Y_UNLIKELY(LogFunc)) { LogFunc(__VA_ARGS__); } } while (0)
NUdf::TUnboxedValue DqBuildInputValue(const NDqProto::TTaskInput& inputDesc, const NKikimr::NMiniKQL::TType* type,
- TVector<IDqInput::TPtr>&& inputs, const THolderFactory& holderFactory)
+ TVector<IDqInput::TPtr>&& inputs, const THolderFactory& holderFactory, TDqBillingStats::TInputStats* stats)
{
switch (inputDesc.GetTypeCase()) {
case NYql::NDqProto::TTaskInput::kSource:
Y_VERIFY(inputs.size() == 1);
[[fallthrough]];
case NYql::NDqProto::TTaskInput::kUnionAll:
- return CreateInputUnionValue(std::move(inputs), holderFactory);
+ return CreateInputUnionValue(std::move(inputs), holderFactory, stats);
case NYql::NDqProto::TTaskInput::kMerge: {
const auto& protoSortCols = inputDesc.GetMerge().GetSortColumns();
TVector<TSortColumnInfo> sortColsInfo;
GetColumnsInfo(type, protoSortCols, sortColsInfo);
YQL_ENSURE(!sortColsInfo.empty());
- return CreateInputMergeValue(std::move(inputs), std::move(sortColsInfo), holderFactory);
+ return CreateInputMergeValue(std::move(inputs), std::move(sortColsInfo), holderFactory, stats);
}
default:
YQL_ENSURE(false, "Unknown input type: " << (ui32) inputDesc.GetTypeCase());
@@ -266,6 +266,10 @@ public:
}
}
+ const TDqBillingStats* GetBillingStats() const override {
+ return &BillingStats;
+ }
+
ui64 GetTaskId() const override {
Y_VERIFY(TaskId, "Not prepared yet");
return TaskId;
@@ -515,6 +519,7 @@ public:
for (ui32 i = 0; i < task.InputsSize(); ++i) {
auto& inputDesc = task.GetInputs(i);
+ auto& inputStats = BillingStats.AddInputs();
TVector<IDqInput::TPtr> inputs{Reserve(std::max<ui64>(inputDesc.ChannelsSize(), 1))}; // 1 is for "source" type of input.
TInputTransformInfo* transform = nullptr;
@@ -572,14 +577,14 @@ public:
auto entryNode = AllocatedHolder->ProgramParsed.CompGraph->GetEntryPoint(i, true);
if (transform) {
- transform->TransformInput = DqBuildInputValue(inputDesc, transform->TransformInputType, std::move(inputs), holderFactory);
+ transform->TransformInput = DqBuildInputValue(inputDesc, transform->TransformInputType, std::move(inputs), holderFactory, nullptr);
inputs.clear();
inputs.emplace_back(transform->TransformOutput);
entryNode->SetValue(AllocatedHolder->ProgramParsed.CompGraph->GetContext(),
- CreateInputUnionValue(std::move(inputs), holderFactory));
+ CreateInputUnionValue(std::move(inputs), holderFactory, &inputStats));
} else {
entryNode->SetValue(AllocatedHolder->ProgramParsed.CompGraph->GetContext(),
- DqBuildInputValue(inputDesc, entry->InputItemTypes[i], std::move(inputs), holderFactory));
+ DqBuildInputValue(inputDesc, entry->InputItemTypes[i], std::move(inputs), holderFactory, &inputStats));
}
}
@@ -978,6 +983,7 @@ private:
bool CollectBasicStats = false;
bool CollectProfileStats = false;
std::unique_ptr<TDqTaskRunnerStats> Stats;
+ TDqBillingStats BillingStats;
TDuration RunComputeTime;
private:
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
index 8ce147c3dd..1c0a0a9043 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
@@ -5,6 +5,7 @@
#include <ydb/library/yql/dq/runtime/dq_async_output.h>
#include <ydb/library/yql/dq/runtime/dq_compute.h>
#include <ydb/library/yql/dq/runtime/dq_input_channel.h>
+#include <ydb/library/yql/dq/runtime/dq_input_producer.h>
#include <ydb/library/yql/dq/runtime/dq_output_channel.h>
#include <ydb/library/yql/dq/runtime/dq_output_consumer.h>
#include <ydb/library/yql/dq/runtime/dq_async_input.h>
@@ -319,6 +320,7 @@ public:
virtual void UpdateStats() = 0;
virtual const TDqTaskRunnerStats* GetStats() const = 0;
+ virtual const TDqBillingStats* GetBillingStats() const = 0;
[[nodiscard]]
virtual TString Save() const = 0;
diff --git a/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto b/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto
index 62e029b30c..c91962581c 100644
--- a/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto
+++ b/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto
@@ -43,6 +43,8 @@ message TCommandHeader {
SINK_IS_FINISHED = 24; // Header -> TIsFinishedResponse
SINK_OUTPUT_TYPE = 25; // Header -> TGetTypeResponse
SINK_STATS = 26; // Header -> TGetSinkStatsResponse
+
+ GET_BILLING_STATS = 27;
};
int32 Version = 1;
@@ -66,6 +68,13 @@ message TPopResponse {
google.protobuf.Any Stats = 4;
}
+message TBillingStatsResponse {
+ message TInputStats {
+ uint64 RowsConsumed = 1;
+ };
+ repeated TInputStats Inputs = 1;
+};
+
message TSinkPopRequest {
uint64 Bytes = 1;
bool Raw = 2;
diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
index 11314e9fe1..848cb287c6 100644
--- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
+++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
@@ -244,6 +244,16 @@ public:
}
_exit(127);
}
+
+ NDqProto::TBillingStatsResponse GetBillingStats() {
+ NDqProto::TBillingStatsResponse resp;
+ auto* stats = Runner->GetBillingStats();
+ for (auto& input : stats->Inputs) {
+ auto* i = resp.AddInputs();
+ i->SetRowsConsumed(input->RowsConsumed);
+ }
+ return resp;
+ }
NDqProto::TGetStatsResponse GetStats(ui64 taskId) {
const auto stats = Runner->GetStats();
@@ -585,6 +595,12 @@ public:
GetStats(taskId).Save(&output);
break;
}
+ case NDqProto::TCommandHeader::GET_BILLING_STATS: {
+ Y_ENSURE(header.GetVersion() >= 3);
+ Y_ENSURE(taskId == Runner->GetTaskId());
+ GetBillingStats().Save(&output);
+ break;
+ }
case NDqProto::TCommandHeader::GET_STATS_INPUT: {
Y_ENSURE(header.GetVersion() >= 3);
Y_ENSURE(taskId == Runner->GetTaskId());
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
index f8efe2d3d3..e37ce4f05d 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
@@ -1607,6 +1607,28 @@ public:
void UpdateStats() override {
}
+ const TDqBillingStats* GetBillingStats() const override {
+ try {
+ NDqProto::TCommandHeader header;
+ header.SetVersion(3);
+ header.SetCommand(NDqProto::TCommandHeader::GET_BILLING_STATS);
+ header.SetTaskId(Task.GetId());
+ header.Save(&Delegate->GetOutput());
+
+ NDqProto::TBillingStatsResponse response;
+ response.Load(&Delegate->GetInput());
+
+ BillingStats.Inputs.clear();
+ for (auto input : response.GetInputs()) {
+ auto i = BillingStats.AddInputs();
+ i.RowsConsumed = input.GetRowsConsumed();
+ }
+ return &BillingStats;
+ } catch (...) {
+ Delegate->RaiseException();
+ }
+ }
+
const TDqTaskRunnerStats* GetStats() const override
{
try {
@@ -1648,6 +1670,7 @@ private:
TIntrusivePtr<TTaskRunner> Delegate;
NDqProto::TDqTask Task;
mutable TDqTaskRunnerStats Stats;
+ mutable TDqBillingStats BillingStats;
THashMap<ui64, IDqInputChannel::TPtr> InputChannels;
THashMap<ui64, IDqAsyncInputBuffer::TPtr> Sources;