diff options
author | ssmike <ssmike@ydb.tech> | 2023-03-06 18:45:21 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2023-03-06 18:45:21 +0300 |
commit | e9af89129288d38b790bbd9b6692b1caca8dad56 (patch) | |
tree | 9b4f1112b54d4015d569f6abb147c12b86429d4c | |
parent | b112ecc27f19aaa1069ccccebfd4d743152f9964 (diff) | |
download | ydb-e9af89129288d38b790bbd9b6692b1caca8dad56.tar.gz |
rework flowcontrol
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 98 |
1 files changed, 66 insertions, 32 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 2dd9222f50d..160b43152ec 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -84,6 +84,7 @@ public: THolder<TEventHandle<TEvDataShard::TEvReadResult>> ReadResult; TMaybe<NKikimr::NMiniKQL::TUnboxedValueVector> Batch; size_t ProcessedRows = 0; + size_t PackedRows = 0; TResult(ui64 shardId, THolder<TEventHandle<TEvDataShard::TEvReadResult>> readResult) : ShardId(shardId) @@ -315,6 +316,7 @@ public: void Reset() { Shard = nullptr; + Finished = true; } }; @@ -367,6 +369,7 @@ public: ) : nullptr)); } Counters->ReadActorsCount->Inc(); + Snapshot = IKqpGateway::TKqpSnapshot(Settings.GetSnapshot().GetStep(), Settings.GetSnapshot().GetTxId()); } static constexpr NKikimrServices::TActivity::EType ActorActivityType() { @@ -397,9 +400,11 @@ public: } void Bootstrap() { - Counters->ReadActorsCount->Inc(); LogPrefix = TStringBuilder() << "SelfId: " << this->SelfId() << ", " << LogPrefix; - Snapshot = IKqpGateway::TKqpSnapshot(Settings.GetSnapshot().GetStep(), Settings.GetSnapshot().GetTxId()); + } + + void StartTableScan() { + ScanStarted = true; THolder<TShardState> stateHolder = MakeHolder<TShardState>(Settings.GetShardIdHint()); PendingShards.PushBack(stateHolder.Get()); auto& state = *stateHolder.Release(); @@ -423,13 +428,12 @@ public: if (!Settings.HasShardIdHint()) { ResolveShard(&state); } else { - StartTableScan(); + StartShards(); } Become(&TKqpReadActor::ReadyState); - Bootstrapped = true; } - bool StartTableScan() { + bool StartShards() { const ui32 maxAllowedInFlight = Settings.GetSorted() ? 1 : MaxInFlight; CA_LOG_D("effective maxinflight " << maxAllowedInFlight << " sorted " << Settings.GetSorted()); bool isFirst = true; @@ -676,7 +680,7 @@ public: } CA_LOG_D(sb); } - StartTableScan(); + StartShards(); } void HandleRetry(TEvRetryShard::TPtr& ev) { @@ -710,7 +714,6 @@ public: } auto state = Reads[id].Shard; - Reads[id].Finished = true; state->RetryAttempt += 1; if (state->RetryAttempt >= MAX_SHARD_RETRIES) { @@ -783,6 +786,7 @@ public: if (limit) { record.SetMaxRows(*limit); } + record.SetMaxBytes(Min<ui64>(record.GetMaxBytes(), BufSize)); record.SetResultFormat(Settings.GetDataFormat()); @@ -807,6 +811,10 @@ public: IEventHandle::FlagTrackDelivery); } + void NotifyCA() { + Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); + } + void HandleRead(TEvDataShard::TEvReadResult::TPtr ev) { const auto& record = ev->Get()->Record; auto id = record.GetReadId(); @@ -869,7 +877,7 @@ public: << " finished = " << ev->Get()->Record.GetFinished()); CA_LOG_T(TStringBuilder() << "read #" << id << " pushed " << DebugPrintCells(ev->Get())); Results.push({Reads[id].Shard->TabletId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release())}); - Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); + NotifyCA(); } void HandleError(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) { @@ -919,13 +927,14 @@ public: return TypeEnv.BindAllocator(); } - NMiniKQL::TBytesStatistics PackArrow(TResult& handle) { - auto& [shardId, result, batch, _] = handle; + NMiniKQL::TBytesStatistics PackArrow(TResult& handle, i64& freeSpace) { + auto& [shardId, result, batch, _, packed] = handle; NMiniKQL::TBytesStatistics stats; bool hasResultColumns = false; if (result->Get()->GetRowsCount() == 0) { return {}; } + YQL_ENSURE(packed == 0); if (Settings.ColumnsSize() == 0) { batch->resize(result->Get()->GetRowsCount(), HolderFactory.GetEmptyContainer()); } else { @@ -962,6 +971,8 @@ public: auto rowsCnt = result->Get()->GetRowsCount(); stats.AddStatistics({sizeof(ui64) * rowsCnt, sizeof(ui64) * rowsCnt}); } + freeSpace -= static_cast<i64>(stats.AllocatedBytes); + packed = result->Get()->GetRowsCount(); return stats; } @@ -982,29 +993,46 @@ public: return builder; } - NMiniKQL::TBytesStatistics PackCells(TResult& handle) { - auto& [shardId, result, batch, _] = handle; + NMiniKQL::TBytesStatistics PackCells(TResult& handle, i64& freeSpace) { + auto& [shardId, result, batch, _, packed] = handle; NMiniKQL::TBytesStatistics stats; batch->reserve(batch->size()); - for (size_t rowIndex = 0; rowIndex < result->Get()->GetRowsCount(); ++rowIndex) { + for (size_t rowIndex = packed; rowIndex < result->Get()->GetRowsCount(); ++rowIndex) { const auto& row = result->Get()->GetCells(rowIndex); NUdf::TUnboxedValue* rowItems = nullptr; batch->emplace_back(HolderFactory.CreateDirectArrayHolder(Settings.ColumnsSize(), rowItems)); - size_t rowSize = 0; + + i64 rowSize = 0; size_t columnIndex = 0; for (size_t resultColumnIndex = 0; resultColumnIndex < Settings.ColumnsSize(); ++resultColumnIndex) { auto tag = Settings.GetColumns(resultColumnIndex).GetId(); + if (!IsSystemColumn(tag)) { + rowSize += row[columnIndex].Size(); + columnIndex += 1; + } + } + rowSize = std::max(rowSize, (i64)8); + + columnIndex = 0; + for (size_t resultColumnIndex = 0; resultColumnIndex < Settings.ColumnsSize(); ++resultColumnIndex) { + auto tag = Settings.GetColumns(resultColumnIndex).GetId(); auto type = MakeTypeInfo(Settings.GetColumns(resultColumnIndex)); if (IsSystemColumn(tag)) { NMiniKQL::FillSystemColumn(rowItems[resultColumnIndex], shardId, tag, type); } else { rowItems[resultColumnIndex] = NMiniKQL::GetCellValue(row[columnIndex], type); - rowSize += row[columnIndex].Size(); columnIndex += 1; } } - stats.DataBytes += std::max(rowSize, (size_t)8); + + stats.DataBytes += rowSize; stats.AllocatedBytes += GetRowSize(rowItems).AllocatedBytes; + freeSpace -= rowSize; + packed = rowIndex + 1; + + if (freeSpace <= 0) { + break; + } } return stats; } @@ -1019,36 +1047,35 @@ public: bool& finished, i64 freeSpace) override { + if (!ScanStarted) { + BufSize = freeSpace; + StartTableScan(); + return 0; + } + CA_LOG_D(TStringBuilder() << " enter getasyncinputdata results size " << Results.size()); ui64 bytes = 0; while (!Results.empty()) { auto& result = Results.front(); + auto& batch = result.Batch; auto& msg = *result.ReadResult->Get(); if (!batch.Defined()) { batch.ConstructInPlace(); switch (msg.Record.GetResultFormat()) { case NKikimrTxDataShard::EScanDataFormat::ARROW: - BytesStats.AddStatistics(PackArrow(result)); + BytesStats.AddStatistics(PackArrow(result, freeSpace)); break; case NKikimrTxDataShard::EScanDataFormat::UNSPECIFIED: case NKikimrTxDataShard::EScanDataFormat::CELLVEC: - BytesStats.AddStatistics(PackCells(result)); + BytesStats.AddStatistics(PackCells(result, freeSpace)); } } auto id = result.ReadResult->Get()->Record.GetReadId(); - if (!Reads[id]) { - Results.pop(); - continue; - } - auto* state = Reads[id].Shard; - for (; result.ProcessedRows < batch->size(); ++result.ProcessedRows) { + for (; result.ProcessedRows < result.PackedRows; ++result.ProcessedRows) { NMiniKQL::TBytesStatistics rowSize = GetRowSize((*batch)[result.ProcessedRows].GetElements()); - if (static_cast<ui64>(freeSpace) < bytes + rowSize.AllocatedBytes) { - break; - } resultVector.push_back(std::move((*batch)[result.ProcessedRows])); ProcessedRowCount += 1; bytes += rowSize.AllocatedBytes; @@ -1060,7 +1087,8 @@ public: } CA_LOG_D(TStringBuilder() << "returned " << resultVector.size() << " rows; processed " << ProcessedRowCount << " rows"); - if (batch->size() == result.ProcessedRows) { + size_t rowCount = result.ReadResult.Get()->Get()->GetRowsCount(); + if (rowCount == result.ProcessedRows) { auto& record = msg.Record; if (!Reads[id].Finished) { TMaybe<ui64> limit; @@ -1072,12 +1100,13 @@ public: auto request = ::DefaultAckSettings(); request->Record.SetReadId(record.GetReadId()); request->Record.SetSeqNo(record.GetSeqNo()); + request->Record.SetMaxBytes(Min<ui64>(request->Record.GetMaxBytes(), BufSize)); if (limit) { request->Record.SetMaxRows(*limit); } Counters->SentIteratorAcks->Inc(); CA_LOG_D("sending ack for read #" << id << " limit " << limit << " seqno = " << record.GetSeqNo()); - Send(::PipeCacheId, new TEvPipeCache::TEvForward(request.Release(), state->TabletId, true), + Send(::PipeCacheId, new TEvPipeCache::TEvForward(request.Release(), Reads[id].Shard->TabletId, true), IEventHandle::FlagTrackDelivery); } else { Reads[id].Finished = true; @@ -1091,7 +1120,7 @@ public: ResetRead(id); } - StartTableScan(); + StartShards(); Results.pop(); CA_LOG_D("dropping batch for read #" << id); @@ -1105,7 +1134,7 @@ public: } } - if (RunningReads() == 0 && PendingShards.Empty() && Bootstrapped) { + if (RunningReads() == 0 && PendingShards.Empty() && ScanStarted) { finished = true; } @@ -1118,6 +1147,10 @@ public: << " has limit " << (Settings.GetItemsLimit() != 0) << " limit reached " << LimitReached()); + if (!Results.empty()) { + NotifyCA(); + } + return bytes; } @@ -1237,7 +1270,8 @@ private: TString LogPrefix; TTableId TableId; - bool Bootstrapped = false; + bool ScanStarted = false; + size_t BufSize = 0; const TActorId ComputeActorId; const ui64 InputIndex; |