aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2023-03-06 18:45:21 +0300
committerssmike <ssmike@ydb.tech>2023-03-06 18:45:21 +0300
commite9af89129288d38b790bbd9b6692b1caca8dad56 (patch)
tree9b4f1112b54d4015d569f6abb147c12b86429d4c
parentb112ecc27f19aaa1069ccccebfd4d743152f9964 (diff)
downloadydb-e9af89129288d38b790bbd9b6692b1caca8dad56.tar.gz
rework flowcontrol
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp98
1 files changed, 66 insertions, 32 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index 2dd9222f50d..160b43152ec 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -84,6 +84,7 @@ public:
THolder<TEventHandle<TEvDataShard::TEvReadResult>> ReadResult;
TMaybe<NKikimr::NMiniKQL::TUnboxedValueVector> Batch;
size_t ProcessedRows = 0;
+ size_t PackedRows = 0;
TResult(ui64 shardId, THolder<TEventHandle<TEvDataShard::TEvReadResult>> readResult)
: ShardId(shardId)
@@ -315,6 +316,7 @@ public:
void Reset() {
Shard = nullptr;
+ Finished = true;
}
};
@@ -367,6 +369,7 @@ public:
) : nullptr));
}
Counters->ReadActorsCount->Inc();
+ Snapshot = IKqpGateway::TKqpSnapshot(Settings.GetSnapshot().GetStep(), Settings.GetSnapshot().GetTxId());
}
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
@@ -397,9 +400,11 @@ public:
}
void Bootstrap() {
- Counters->ReadActorsCount->Inc();
LogPrefix = TStringBuilder() << "SelfId: " << this->SelfId() << ", " << LogPrefix;
- Snapshot = IKqpGateway::TKqpSnapshot(Settings.GetSnapshot().GetStep(), Settings.GetSnapshot().GetTxId());
+ }
+
+ void StartTableScan() {
+ ScanStarted = true;
THolder<TShardState> stateHolder = MakeHolder<TShardState>(Settings.GetShardIdHint());
PendingShards.PushBack(stateHolder.Get());
auto& state = *stateHolder.Release();
@@ -423,13 +428,12 @@ public:
if (!Settings.HasShardIdHint()) {
ResolveShard(&state);
} else {
- StartTableScan();
+ StartShards();
}
Become(&TKqpReadActor::ReadyState);
- Bootstrapped = true;
}
- bool StartTableScan() {
+ bool StartShards() {
const ui32 maxAllowedInFlight = Settings.GetSorted() ? 1 : MaxInFlight;
CA_LOG_D("effective maxinflight " << maxAllowedInFlight << " sorted " << Settings.GetSorted());
bool isFirst = true;
@@ -676,7 +680,7 @@ public:
}
CA_LOG_D(sb);
}
- StartTableScan();
+ StartShards();
}
void HandleRetry(TEvRetryShard::TPtr& ev) {
@@ -710,7 +714,6 @@ public:
}
auto state = Reads[id].Shard;
- Reads[id].Finished = true;
state->RetryAttempt += 1;
if (state->RetryAttempt >= MAX_SHARD_RETRIES) {
@@ -783,6 +786,7 @@ public:
if (limit) {
record.SetMaxRows(*limit);
}
+ record.SetMaxBytes(Min<ui64>(record.GetMaxBytes(), BufSize));
record.SetResultFormat(Settings.GetDataFormat());
@@ -807,6 +811,10 @@ public:
IEventHandle::FlagTrackDelivery);
}
+ void NotifyCA() {
+ Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
+ }
+
void HandleRead(TEvDataShard::TEvReadResult::TPtr ev) {
const auto& record = ev->Get()->Record;
auto id = record.GetReadId();
@@ -869,7 +877,7 @@ public:
<< " finished = " << ev->Get()->Record.GetFinished());
CA_LOG_T(TStringBuilder() << "read #" << id << " pushed " << DebugPrintCells(ev->Get()));
Results.push({Reads[id].Shard->TabletId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release())});
- Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
+ NotifyCA();
}
void HandleError(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
@@ -919,13 +927,14 @@ public:
return TypeEnv.BindAllocator();
}
- NMiniKQL::TBytesStatistics PackArrow(TResult& handle) {
- auto& [shardId, result, batch, _] = handle;
+ NMiniKQL::TBytesStatistics PackArrow(TResult& handle, i64& freeSpace) {
+ auto& [shardId, result, batch, _, packed] = handle;
NMiniKQL::TBytesStatistics stats;
bool hasResultColumns = false;
if (result->Get()->GetRowsCount() == 0) {
return {};
}
+ YQL_ENSURE(packed == 0);
if (Settings.ColumnsSize() == 0) {
batch->resize(result->Get()->GetRowsCount(), HolderFactory.GetEmptyContainer());
} else {
@@ -962,6 +971,8 @@ public:
auto rowsCnt = result->Get()->GetRowsCount();
stats.AddStatistics({sizeof(ui64) * rowsCnt, sizeof(ui64) * rowsCnt});
}
+ freeSpace -= static_cast<i64>(stats.AllocatedBytes);
+ packed = result->Get()->GetRowsCount();
return stats;
}
@@ -982,29 +993,46 @@ public:
return builder;
}
- NMiniKQL::TBytesStatistics PackCells(TResult& handle) {
- auto& [shardId, result, batch, _] = handle;
+ NMiniKQL::TBytesStatistics PackCells(TResult& handle, i64& freeSpace) {
+ auto& [shardId, result, batch, _, packed] = handle;
NMiniKQL::TBytesStatistics stats;
batch->reserve(batch->size());
- for (size_t rowIndex = 0; rowIndex < result->Get()->GetRowsCount(); ++rowIndex) {
+ for (size_t rowIndex = packed; rowIndex < result->Get()->GetRowsCount(); ++rowIndex) {
const auto& row = result->Get()->GetCells(rowIndex);
NUdf::TUnboxedValue* rowItems = nullptr;
batch->emplace_back(HolderFactory.CreateDirectArrayHolder(Settings.ColumnsSize(), rowItems));
- size_t rowSize = 0;
+
+ i64 rowSize = 0;
size_t columnIndex = 0;
for (size_t resultColumnIndex = 0; resultColumnIndex < Settings.ColumnsSize(); ++resultColumnIndex) {
auto tag = Settings.GetColumns(resultColumnIndex).GetId();
+ if (!IsSystemColumn(tag)) {
+ rowSize += row[columnIndex].Size();
+ columnIndex += 1;
+ }
+ }
+ rowSize = std::max(rowSize, (i64)8);
+
+ columnIndex = 0;
+ for (size_t resultColumnIndex = 0; resultColumnIndex < Settings.ColumnsSize(); ++resultColumnIndex) {
+ auto tag = Settings.GetColumns(resultColumnIndex).GetId();
auto type = MakeTypeInfo(Settings.GetColumns(resultColumnIndex));
if (IsSystemColumn(tag)) {
NMiniKQL::FillSystemColumn(rowItems[resultColumnIndex], shardId, tag, type);
} else {
rowItems[resultColumnIndex] = NMiniKQL::GetCellValue(row[columnIndex], type);
- rowSize += row[columnIndex].Size();
columnIndex += 1;
}
}
- stats.DataBytes += std::max(rowSize, (size_t)8);
+
+ stats.DataBytes += rowSize;
stats.AllocatedBytes += GetRowSize(rowItems).AllocatedBytes;
+ freeSpace -= rowSize;
+ packed = rowIndex + 1;
+
+ if (freeSpace <= 0) {
+ break;
+ }
}
return stats;
}
@@ -1019,36 +1047,35 @@ public:
bool& finished,
i64 freeSpace) override
{
+ if (!ScanStarted) {
+ BufSize = freeSpace;
+ StartTableScan();
+ return 0;
+ }
+
CA_LOG_D(TStringBuilder() << " enter getasyncinputdata results size " << Results.size());
ui64 bytes = 0;
while (!Results.empty()) {
auto& result = Results.front();
+
auto& batch = result.Batch;
auto& msg = *result.ReadResult->Get();
if (!batch.Defined()) {
batch.ConstructInPlace();
switch (msg.Record.GetResultFormat()) {
case NKikimrTxDataShard::EScanDataFormat::ARROW:
- BytesStats.AddStatistics(PackArrow(result));
+ BytesStats.AddStatistics(PackArrow(result, freeSpace));
break;
case NKikimrTxDataShard::EScanDataFormat::UNSPECIFIED:
case NKikimrTxDataShard::EScanDataFormat::CELLVEC:
- BytesStats.AddStatistics(PackCells(result));
+ BytesStats.AddStatistics(PackCells(result, freeSpace));
}
}
auto id = result.ReadResult->Get()->Record.GetReadId();
- if (!Reads[id]) {
- Results.pop();
- continue;
- }
- auto* state = Reads[id].Shard;
- for (; result.ProcessedRows < batch->size(); ++result.ProcessedRows) {
+ for (; result.ProcessedRows < result.PackedRows; ++result.ProcessedRows) {
NMiniKQL::TBytesStatistics rowSize = GetRowSize((*batch)[result.ProcessedRows].GetElements());
- if (static_cast<ui64>(freeSpace) < bytes + rowSize.AllocatedBytes) {
- break;
- }
resultVector.push_back(std::move((*batch)[result.ProcessedRows]));
ProcessedRowCount += 1;
bytes += rowSize.AllocatedBytes;
@@ -1060,7 +1087,8 @@ public:
}
CA_LOG_D(TStringBuilder() << "returned " << resultVector.size() << " rows; processed " << ProcessedRowCount << " rows");
- if (batch->size() == result.ProcessedRows) {
+ size_t rowCount = result.ReadResult.Get()->Get()->GetRowsCount();
+ if (rowCount == result.ProcessedRows) {
auto& record = msg.Record;
if (!Reads[id].Finished) {
TMaybe<ui64> limit;
@@ -1072,12 +1100,13 @@ public:
auto request = ::DefaultAckSettings();
request->Record.SetReadId(record.GetReadId());
request->Record.SetSeqNo(record.GetSeqNo());
+ request->Record.SetMaxBytes(Min<ui64>(request->Record.GetMaxBytes(), BufSize));
if (limit) {
request->Record.SetMaxRows(*limit);
}
Counters->SentIteratorAcks->Inc();
CA_LOG_D("sending ack for read #" << id << " limit " << limit << " seqno = " << record.GetSeqNo());
- Send(::PipeCacheId, new TEvPipeCache::TEvForward(request.Release(), state->TabletId, true),
+ Send(::PipeCacheId, new TEvPipeCache::TEvForward(request.Release(), Reads[id].Shard->TabletId, true),
IEventHandle::FlagTrackDelivery);
} else {
Reads[id].Finished = true;
@@ -1091,7 +1120,7 @@ public:
ResetRead(id);
}
- StartTableScan();
+ StartShards();
Results.pop();
CA_LOG_D("dropping batch for read #" << id);
@@ -1105,7 +1134,7 @@ public:
}
}
- if (RunningReads() == 0 && PendingShards.Empty() && Bootstrapped) {
+ if (RunningReads() == 0 && PendingShards.Empty() && ScanStarted) {
finished = true;
}
@@ -1118,6 +1147,10 @@ public:
<< " has limit " << (Settings.GetItemsLimit() != 0)
<< " limit reached " << LimitReached());
+ if (!Results.empty()) {
+ NotifyCA();
+ }
+
return bytes;
}
@@ -1237,7 +1270,8 @@ private:
TString LogPrefix;
TTableId TableId;
- bool Bootstrapped = false;
+ bool ScanStarted = false;
+ size_t BufSize = 0;
const TActorId ComputeActorId;
const ui64 InputIndex;