aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-06-29 17:33:20 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-06-29 17:33:20 +0300
commite3ec0b86507a1915e9ac47235efab317582fea67 (patch)
tree6c6fad645828a3f1fad2437d9e6ae39e7c598031
parent82e2b3d036ee20d7fccdfc48c7e65da523d93a70 (diff)
downloadydb-e3ec0b86507a1915e9ac47235efab317582fea67.tar.gz
speed up scanners requesting in case global memory limits usage
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_events.h13
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_state.h1
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h4
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h19
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp32
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h7
-rw-r--r--ydb/core/protos/kqp.proto1
-rw-r--r--ydb/core/protos/services.proto4
8 files changed, 61 insertions, 20 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_events.h b/ydb/core/kqp/compute_actor/kqp_compute_events.h
index e0cd067901c..0d7101c4144 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_events.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_events.h
@@ -30,6 +30,7 @@ struct TEvKqpCompute {
, Generation(generation)
, Finished(false) {}
+ std::optional<ui32> AvailablePacks;
ui32 ScanId;
ui32 Generation;
TVector<TOwnedCellVec> Rows;
@@ -52,11 +53,7 @@ struct TEvKqpCompute {
}
bool IsEmpty() const {
- if (ArrowBatch) {
- return ArrowBatch->num_rows() == 0;
- } else {
- return Rows.size() == 0;
- }
+ return GetRowsCount() == 0;
}
bool IsSerializable() const override {
@@ -93,6 +90,9 @@ struct TEvKqpCompute {
ev->Finished = pbEv->Record.GetFinished();
ev->RequestedBytesLimitReached = pbEv->Record.GetRequestedBytesLimitReached();
ev->LastKey = TOwnedCellVec(TSerializedCellVec(pbEv->Record.GetLastKey()).GetCells());
+ if (pbEv->Record.HasAvailablePacks()) {
+ ev->AvailablePacks = pbEv->Record.GetAvailablePacks();
+ }
auto rows = pbEv->Record.GetRows();
ev->Rows.reserve(rows.size());
@@ -123,6 +123,9 @@ struct TEvKqpCompute {
Remote->Record.SetPageFaults(PageFaults);
Remote->Record.SetPageFault(PageFault);
Remote->Record.SetLastKey(TSerializedCellVec::Serialize(LastKey));
+ if (AvailablePacks) {
+ Remote->Record.SetAvailablePacks(*AvailablePacks);
+ }
switch (GetDataFormat()) {
case NKikimrTxDataShard::EScanDataFormat::UNSPECIFIED:
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_state.h b/ydb/core/kqp/compute_actor/kqp_compute_state.h
index 8edbc0ffd75..674ced5176b 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_state.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_state.h
@@ -39,6 +39,7 @@ struct TShardState: public TCommonRetriesState {
bool SubscribedOnTablet = false;
TActorId ActorId;
TOwnedCellVec LastKey;
+ std::optional<ui32> AvailablePacks;
TString PrintLastKey(TConstArrayRef<NScheme::TTypeInfo> keyTypes) const;
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h
index 79b2c068745..f904353d686 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h
@@ -26,6 +26,10 @@ private:
return TBase::CalcMkqlMemoryLimit() + ComputeCtx.GetTableScans().size() * MemoryLimits.ChannelBufferSize;
}
public:
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::KQP_SCAN_COMPUTE_ACTOR;
+ }
+
TKqpScanComputeActor(const TActorId& executerId, ui64 txId,
NYql::NDqProto::TDqTask* task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h
index 8e183b6d09a..eee64d41e81 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h
@@ -18,6 +18,10 @@ public:
: ShardState(shardState) {
}
+
+ bool operator<(const TWaitingShard& item) const {
+ return ShardState->AvailablePacks.value_or(0) < item.ShardState->AvailablePacks.value_or(0);
+ }
};
class TFreeComputeActor {
@@ -42,7 +46,7 @@ private:
std::map<ui64, TFreeComputeActor> ComputeActorsWaitData;
std::set<ui64> ShardsWaitingIds;
- std::deque<TWaitingShard> ShardsWaiting;
+ std::vector<TWaitingShard> ShardsWaiting;
void DetachComputeActorFromShard(const ui64 tabletId) {
ShardsWaitingIds.erase(tabletId);
@@ -97,8 +101,9 @@ public:
return false;
} else {
Y_VERIFY(ShardsWaitingIds.erase(ShardsWaiting.front().GetShardState()->TabletId));
- result = std::move(ShardsWaiting.front());
- ShardsWaiting.pop_front();
+ std::pop_heap(ShardsWaiting.begin(), ShardsWaiting.end());
+ result = std::move(ShardsWaiting.back());
+ ShardsWaiting.pop_back();
return true;
}
}
@@ -126,10 +131,18 @@ public:
}
}
+ bool ReturnShardInPool(TShardState::TPtr state) {
+ Y_VERIFY(ShardsWaitingIds.emplace(state->TabletId).second);
+ ShardsWaiting.emplace_back(TWaitingShard(state));
+ std::push_heap(ShardsWaiting.begin(), ShardsWaiting.end());
+ return true;
+ }
+
bool PrepareShardAck(TShardState::TPtr state, ui64& freeSpace) {
if (ComputeActorsWaitShard.empty()) {
Y_VERIFY(ShardsWaitingIds.emplace(state->TabletId).second);
ShardsWaiting.emplace_back(TWaitingShard(state));
+ std::push_heap(ShardsWaiting.begin(), ShardsWaiting.end());
return false;
}
std::pop_heap(ComputeActorsWaitShard.begin(), ComputeActorsWaitShard.end());
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
index 876116fe92e..62a91c1d161 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
@@ -22,7 +22,7 @@ static constexpr ui64 MAX_SHARD_RESOLVES = 3;
} // anonymous namespace
-NScanPrivate::TKqpScanFetcherActor::TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snapshot,
+TKqpScanFetcherActor::TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snapshot,
const TComputeRuntimeSettings& settings, std::vector<NActors::TActorId>&& computeActors, const ui64 txId,
const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta, const TShardsScanningPolicy& shardsScanningPolicy,
TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId)
@@ -86,6 +86,7 @@ void TKqpScanFetcherActor::Bootstrap() {
}
void TKqpScanFetcherActor::HandleExecute(TEvScanExchange::TEvAckData::TPtr& ev) {
+ Y_VERIFY(ev->Get()->GetFreeSpace());
ALS_DEBUG(NKikimrServices::KQP_COMPUTE) << "EvAckData (" << SelfId() << "): " << ev->Sender;
if (!InFlightComputes.OnComputeAck(ev->Sender, ev->Get()->GetFreeSpace())) {
ALS_DEBUG(NKikimrServices::KQP_COMPUTE) << "EvAckData (" << SelfId() << "): " << ev->Sender << " IGNORED";
@@ -95,7 +96,7 @@ void TKqpScanFetcherActor::HandleExecute(TEvScanExchange::TEvAckData::TPtr& ev)
}
void TKqpScanFetcherActor::HandleExecute(TEvScanExchange::TEvTerminateFromCompute::TPtr& ev) {
- ALS_DEBUG(NKikimrServices::KQP_COMPUTE) << "TEvTerminateFromCompute: " << ev->Sender;
+ AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "TEvTerminateFromCompute")("sender", ev->Sender);
for (auto&& itTablet : InFlightShards) {
for (auto&& it : itTablet.second) {
auto state = it.second;
@@ -115,6 +116,7 @@ void TKqpScanFetcherActor::HandleExecute(TEvScanExchange::TEvTerminateFromComput
}
}
}
+ PassAway();
}
void TKqpScanFetcherActor::HandleExecute(TEvKqpCompute::TEvScanInitActor::TPtr& ev) {
@@ -429,6 +431,7 @@ void TKqpScanFetcherActor::HandleExecute(TEvInterconnect::TEvNodeDisconnected::T
if (state->ActorId && state->ActorId.NodeId() == nodeId) {
SendGlobalFail(NDqProto::StatusIds::UNAVAILABLE, TIssuesIds::DEFAULT_ERROR,
TStringBuilder() << "Connection with node " << nodeId << " lost.");
+ break;
}
}
}
@@ -448,12 +451,13 @@ bool TKqpScanFetcherActor::SendGlobalFail(const NDqProto::EComputeState state, N
return true;
}
-bool TKqpScanFetcherActor::ProvideDataToCompute(TEvKqpCompute::TEvScanData& msg, TShardState::TPtr state) {
+bool TKqpScanFetcherActor::ProvideDataToCompute(const NActors::TActorId& scannerId, TEvKqpCompute::TEvScanData& msg, TShardState::TPtr state) noexcept {
if (msg.IsEmpty()) {
InFlightComputes.OnEmptyDataReceived(state->TabletId, msg.RequestedBytesLimitReached || msg.Finished);
} else {
- ALS_DEBUG(NKikimrServices::KQP_COMPUTE) << "PROVIDING (FROM " << SelfId() << "): used free compute " << InFlightComputes.DebugString();
auto computeActorInfo = InFlightComputes.OnDataReceived(state->TabletId, msg.RequestedBytesLimitReached || msg.Finished);
+ ALS_DEBUG(NKikimrServices::KQP_COMPUTE) << SelfId() << " PROVIDING (FROM " << scannerId << " to " << computeActorInfo.GetActorId() <<
+ "): used free compute " << InFlightComputes.DebugString();
Send(computeActorInfo.GetActorId(), new TEvScanExchange::TEvSendData(msg, state->TabletId));
}
return true;
@@ -518,7 +522,7 @@ THolder<NKikimr::TEvDataShard::TEvKqpScan> TKqpScanFetcherActor::BuildEvKqpScan(
return ev;
}
-void TKqpScanFetcherActor::ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData::TPtr& ev, const TInstant& enqueuedAt) {
+void TKqpScanFetcherActor::ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData::TPtr& ev, const TInstant& enqueuedAt) noexcept {
auto& msg = *ev->Get();
auto state = GetShardState(msg, ev->Sender);
@@ -542,17 +546,22 @@ void TKqpScanFetcherActor::ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData
<< ";in flight shards=" << InFlightShards.GetShardsCount()
<< ";delayed_for=" << latency.SecondsFloat() << " seconds by ratelimiter"
<< ";tablet_id=" << state->TabletId);
- ProvideDataToCompute(msg, state);
+ ProvideDataToCompute(ev->Sender, msg, state);
+ state->AvailablePacks = msg.AvailablePacks;
InFlightShards.MutableStatistics(state->TabletId).AddPack(rowsCount, 0);
-
Stats.AddReadStat(state->ScannerIdx, rowsCount, 0);
bool stopFinally = false;
- CA_LOG_T("EVLOGKQP:" << IsAggregationRequest << "/" << Meta.GetItemsLimit() << "/" << InFlightShards.GetTotalRowsCount() << "/" << rowsCount);
+ CA_LOG_D("EVLOGKQP:" << IsAggregationRequest << "/" << Meta.GetItemsLimit() << "/" << InFlightShards.GetTotalRowsCount() << "/" << rowsCount);
if (!msg.Finished) {
if (msg.RequestedBytesLimitReached) {
InFlightShards.NeedAck(state);
- SendScanDataAck(state);
+ if (!state->AvailablePacks || *state->AvailablePacks == 0) {
+ ReturnShardInPool(state);
+ DoAckAvailableWaiting();
+ } else {
+ SendScanDataAck(state);
+ }
}
} else {
CA_LOG_D("Chunk " << state->TabletId << "/" << state->ScannerIdx << " scan finished");
@@ -627,6 +636,10 @@ void TKqpScanFetcherActor::StartReadShard(TShardState::TPtr state) {
SendStartScanRequest(state, state->Generation);
}
+bool TKqpScanFetcherActor::ReturnShardInPool(TShardState::TPtr state) {
+ return InFlightComputes.ReturnShardInPool(state);
+}
+
bool TKqpScanFetcherActor::SendScanDataAck(TShardState::TPtr state) {
ui64 freeSpace;
if (!InFlightComputes.PrepareShardAck(state, freeSpace)) {
@@ -678,6 +691,7 @@ void TKqpScanFetcherActor::RetryDeliveryProblem(TShardState::TPtr state) {
// so after several consecutive delivery problem responses retry logic should
// resolve shard details again.
if (state->RetryAttempt >= MAX_SHARD_RETRIES) {
+ Send(state->ActorId, new NActors::TEvents::TEvPoisonPill());
return ResolveShard(*state);
}
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h
index a87790969f5..04b58eae0bb 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h
+++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h
@@ -50,9 +50,10 @@ private:
const NMiniKQL::TScanDataMetaFull ScanDataMeta;
const NYql::NDq::TComputeRuntimeSettings RuntimeSettings;
const NYql::NDq::TTxId TxId;
+ bool ReturnShardInPool(TShardState::TPtr state);
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
- return NKikimrServices::TActivity::KQP_SCAN_COMPUTE_ACTOR;
+ return NKikimrServices::TActivity::KQP_SCAN_FETCH_ACTOR;
}
TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snapshot, const NYql::NDq::TComputeRuntimeSettings& settings,
@@ -102,7 +103,7 @@ private:
bool SendGlobalFail(const NYql::NDqProto::EComputeState state, NYql::NDqProto::StatusIds::StatusCode statusCode, const NYql::TIssues& issues) const;
- bool ProvideDataToCompute(TEvKqpCompute::TEvScanData& msg, TShardState::TPtr state);
+ bool ProvideDataToCompute(const NActors::TActorId& scannerId, TEvKqpCompute::TEvScanData& msg, TShardState::TPtr state) noexcept;
bool SendScanFinished();
@@ -112,7 +113,7 @@ private:
void HandleExecute(TEvKqpCompute::TEvScanData::TPtr& ev);
- void ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData::TPtr& ev, const TInstant& enqueuedAt);
+ void ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData::TPtr& ev, const TInstant& enqueuedAt) noexcept;
void ProcessScanData();
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index f52e14fae1d..5f5235409d1 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -585,6 +585,7 @@ message TEvRemoteScanData {
optional TArrowBatch ArrowBatch = 10;
optional bool RequestedBytesLimitReached = 11 [default = false];
+ optional uint32 AvailablePacks = 12;
}
message TEvRemoteScanDataAck {
diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto
index f5ef8c1a9a2..121ee7f9bcc 100644
--- a/ydb/core/protos/services.proto
+++ b/ydb/core/protos/services.proto
@@ -371,6 +371,8 @@ enum EServiceKikimr {
ARROW_HELPER = 2100;
KAFKA_PROXY = 2200;
+
+ OBJECTS_MONITORING = 2300;
};
message TActivity {
@@ -991,5 +993,7 @@ message TActivity {
FEDERATION_DISCOVERY = 611;
GRPC_REQ_SHARD_WRITER = 612;
CMS_API_ADAPTER = 613;
+ KQP_SCAN_FETCH_ACTOR = 614;
+ COLUMNSHARD_CONVEYOR = 615;
};
};