aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMikhail Surin <surinmike@gmail.com>2022-06-06 20:16:58 +0300
committerMikhail Surin <surinmike@gmail.com>2022-06-06 20:16:58 +0300
commitef94dfade53319dc390170eb18e6322160973a55 (patch)
tree6b3025a2abbc27ca3c12a3f2618a900180aa129e
parent6b166539157ef1475a849a381d15c4a1ca582b66 (diff)
downloadydb-ef94dfade53319dc390170eb18e6322160973a55.tar.gz
Support multiple ranges in SystemViewScan
ref:d014de967e9320e73698f51c9c0448a68ae2f828
-rw-r--r--ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp23
-rw-r--r--ydb/core/kqp/ut/kqp_sys_view_ut.cpp69
-rw-r--r--ydb/core/sys_view/scan.cpp134
-rw-r--r--ydb/core/sys_view/scan.h3
4 files changed, 210 insertions, 19 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
index dd24f15d395..7c4a597ce0c 100644
--- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
@@ -89,9 +89,8 @@ public:
}
TSmallVec<NMiniKQL::TKqpScanComputeContext::TColumn> columns;
- TSerializedTableRange* rangeRef = nullptr;
- TSerializedTableRange coveringRange;
+ TVector<TSerializedTableRange> ranges;
if (Meta) {
YQL_ENSURE(ComputeCtx.GetTableScans().empty());
@@ -107,30 +106,16 @@ public:
}
const auto& protoRanges = Meta->GetReads()[0].GetKeyRanges();
- if (protoRanges.size() == 1) {
- coveringRange.Load(protoRanges[0]);
- if (!protoRanges[0].HasTo()) {
- coveringRange.To = coveringRange.From;
- coveringRange.FromInclusive = coveringRange.ToInclusive = true;
- }
- } else {
- // TODO: FIX IT, verify here?
- auto from = TSerializedTableRange(*protoRanges.begin());
- auto to = TSerializedTableRange(*protoRanges.rbegin());
- coveringRange.From = from.From;
- coveringRange.FromInclusive = from.FromInclusive;
- coveringRange.To = protoRanges.rbegin()->HasTo() ? to.To : to.From;
- coveringRange.ToInclusive = protoRanges.rbegin()->HasTo() ? to.ToInclusive : true;
+ for (auto& range : protoRanges) {
+ ranges.emplace_back(range);
}
-
- rangeRef = &coveringRange;
}
if (ScanData) {
ScanData->TaskId = GetTask().GetId();
ScanData->TableReader = CreateKqpTableReader(*ScanData);
- auto scanActor = NSysView::CreateSystemViewScan(SelfId(), 0, ScanData->TableId, rangeRef->ToTableRange(), columns);
+ auto scanActor = NSysView::CreateSystemViewScan(SelfId(), 0, ScanData->TableId, ranges, columns);
if (!scanActor) {
InternalError(TIssuesIds::DEFAULT_ERROR, TStringBuilder()
diff --git a/ydb/core/kqp/ut/kqp_sys_view_ut.cpp b/ydb/core/kqp/ut/kqp_sys_view_ut.cpp
index 1f8988517c4..23a0b6ca2a5 100644
--- a/ydb/core/kqp/ut/kqp_sys_view_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_sys_view_ut.cpp
@@ -90,6 +90,75 @@ Y_UNIT_TEST_SUITE(KqpSystemView) {
])", StreamResultToYson(it));
}
+ Y_UNIT_TEST(PartitionStatsRanges) {
+ TKikimrRunner kikimr;
+ auto client = kikimr.GetTableClient();
+
+ auto it = client.StreamExecuteScanQuery(R"(
+ PRAGMA Kikimr.OptEnablePredicateExtract = "true";
+ SELECT OwnerId, PartIdx, Path, PathId
+ FROM `/Root/.sys/partition_stats`
+ WHERE
+ OwnerId = 72057594046644480ul
+ AND PathId = 5u
+ AND (PartIdx BETWEEN 0 AND 2 OR PartIdx BETWEEN 6 AND 9)
+ ORDER BY PathId, PartIdx;
+ )").GetValueSync();
+
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+
+ CompareYson(R"([
+ [[72057594046644480u];[0u];["/Root/BatchUpload"];[5u]];
+ [[72057594046644480u];[1u];["/Root/BatchUpload"];[5u]];
+ [[72057594046644480u];[2u];["/Root/BatchUpload"];[5u]];
+ [[72057594046644480u];[6u];["/Root/BatchUpload"];[5u]];
+ [[72057594046644480u];[7u];["/Root/BatchUpload"];[5u]];
+ [[72057594046644480u];[8u];["/Root/BatchUpload"];[5u]];
+ [[72057594046644480u];[9u];["/Root/BatchUpload"];[5u]];
+ ])", StreamResultToYson(it));
+ }
+
+ Y_UNIT_TEST(PartitionStatsParametricRanges) {
+ TKikimrRunner kikimr;
+ auto client = kikimr.GetTableClient();
+
+ auto paramsBuilder = client.GetParamsBuilder();
+ auto params = paramsBuilder
+ .AddParam("$l1").Int32(0).Build()
+ .AddParam("$r1").Int32(2).Build()
+ .AddParam("$l2").Int32(6).Build()
+ .AddParam("$r2").Int32(9).Build().Build();
+
+ auto it = client.StreamExecuteScanQuery(R"(
+ DECLARE $l1 AS Int32;
+ DECLARE $r1 AS Int32;
+
+ DECLARE $l2 AS Int32;
+ DECLARE $r2 AS Int32;
+
+ PRAGMA Kikimr.OptEnablePredicateExtract = "true";
+ SELECT OwnerId, PartIdx, Path, PathId
+ FROM `/Root/.sys/partition_stats`
+ WHERE
+ OwnerId = 72057594046644480ul
+ AND PathId = 5u
+ AND (PartIdx BETWEEN $l1 AND $r1 OR PartIdx BETWEEN $l2 AND $r2)
+ ORDER BY PathId, PartIdx;
+ )", params).GetValueSync();
+
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+
+ CompareYson(R"([
+ [[72057594046644480u];[0u];["/Root/BatchUpload"];[5u]];
+ [[72057594046644480u];[1u];["/Root/BatchUpload"];[5u]];
+ [[72057594046644480u];[2u];["/Root/BatchUpload"];[5u]];
+ [[72057594046644480u];[6u];["/Root/BatchUpload"];[5u]];
+ [[72057594046644480u];[7u];["/Root/BatchUpload"];[5u]];
+ [[72057594046644480u];[8u];["/Root/BatchUpload"];[5u]];
+ [[72057594046644480u];[9u];["/Root/BatchUpload"];[5u]];
+ ])", StreamResultToYson(it));
+ }
+
Y_UNIT_TEST(PartitionStatsRange1) {
TKikimrRunner kikimr;
auto client = kikimr.GetTableClient();
diff --git a/ydb/core/sys_view/scan.cpp b/ydb/core/sys_view/scan.cpp
index f337cb6ba77..7cdb56bc6fe 100644
--- a/ydb/core/sys_view/scan.cpp
+++ b/ydb/core/sys_view/scan.cpp
@@ -1,5 +1,7 @@
#include "scan.h"
+#include <ydb/core/kqp/kqp_compute.h>
+
#include <ydb/core/sys_view/common/schema.h>
#include <ydb/core/sys_view/partition_stats/partition_stats.h>
#include <ydb/core/sys_view/nodes/nodes.h>
@@ -12,9 +14,141 @@
#include <ydb/core/sys_view/storage/storage_stats.h>
#include <ydb/core/sys_view/tablets/tablets.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/interconnect.h>
+#include <library/cpp/actors/core/log.h>
+
namespace NKikimr {
namespace NSysView {
+class TSysViewRangesReader : public TActor<TSysViewRangesReader> {
+public:
+ using TBase = TActor<TSysViewRangesReader>;
+
+ TSysViewRangesReader(
+ const TActorId& ownerId,
+ ui32 scanId,
+ const TTableId& tableId,
+ TVector<TSerializedTableRange> ranges,
+ const TArrayRef<NMiniKQL::TKqpComputeContextBase::TColumn>& columns)
+ : TBase(&TSysViewRangesReader::ScanState)
+ , OwnerId(ownerId)
+ , ScanId(scanId)
+ , TableId(tableId)
+ , Ranges(std::move(ranges))
+ , Columns(columns.begin(), columns.end())
+ {
+ }
+
+ static constexpr auto ActorActivityType() {
+ return NKikimrServices::TActivity::KQP_SYSTEM_VIEW_SCAN;
+ }
+
+ STFUNC(ScanState) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NKqp::TEvKqpCompute::TEvScanDataAck, Handle);
+ hFunc(NKikimr::NKqp::TEvKqpCompute::TEvScanData, Handle);
+ hFunc(NKikimr::NKqp::TEvKqpCompute::TEvScanError, ResendToOwnerAndDie);
+ hFunc(NKqp::TEvKqp::TEvAbortExecution, HandleAbortExecution);
+ cFunc(TEvents::TEvPoison::EventType, PassAway);
+ hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, ResendToOwnerAndDie);
+ hFunc(TEvents::TEvUndelivered, ResendToOwnerAndDie);
+ hFunc(NKqp::TEvKqpCompute::TEvScanInitActor, Handle);
+ default:
+ LOG_CRIT(ctx, NKikimrServices::SYSTEM_VIEWS,
+ "NSysView: unexpected event 0x%08" PRIx32, ev->GetTypeRewrite());
+ }
+ }
+
+ void Handle(NKqp::TEvKqpCompute::TEvScanInitActor::TPtr& msg) {
+ ActorIdToProto(SelfId(), msg->Get()->Record.MutableScanActorId());
+ Send(OwnerId, msg->Release().Release());
+ }
+
+ void Handle(NKqp::TEvKqpCompute::TEvScanDataAck::TPtr& ack) {
+ Y_VERIFY_DEBUG(ack->Sender == OwnerId);
+ if (!ScanActorId) {
+ if (CurrentRange < Ranges.size()) {
+ auto actor = CreateSystemViewScan(
+ SelfId(), ScanId, TableId, Ranges[CurrentRange].ToTableRange(),
+ Columns);
+ ScanActorId = Register(actor.Release());
+ CurrentRange += 1;
+ } else {
+ PassAway();
+ return;
+ }
+ }
+
+ Send(*ScanActorId, new NKqp::TEvKqpCompute::TEvScanDataAck(ack->Get()->FreeSpace, ack->Get()->Generation));
+ }
+
+ void Handle(NKqp::TEvKqpCompute::TEvScanData::TPtr& data) {
+ bool& finished = (*data->Get()).Finished;
+ if (finished) {
+ if (CurrentRange != Ranges.size()) {
+ finished = false;
+ ScanActorId.Clear();
+ } else {
+ TBase::Send(OwnerId, THolder(data->Release().Release()));
+ PassAway();
+ return;
+ }
+ }
+
+ TBase::Send(OwnerId, THolder(data->Release().Release()));
+ }
+
+ void HandleAbortExecution(NKqp::TEvKqp::TEvAbortExecution::TPtr& ev) {
+ LOG_ERROR_S(TlsActivationContext->AsActorContext(), NKikimrServices::SYSTEM_VIEWS,
+ "Got abort execution event, actor: " << TBase::SelfId()
+ << ", owner: " << OwnerId
+ << ", scan id: " << ScanId
+ << ", table id: " << TableId
+ << ", code: " << NYql::NDqProto::StatusIds::StatusCode_Name(ev->Get()->Record.GetStatusCode())
+ << ", error: " << ev->Get()->GetIssues().ToOneLineString());
+
+ if (ScanActorId) {
+ Send(*ScanActorId, THolder(ev->Release().Release()));
+ }
+
+ PassAway();
+ }
+
+ void ResendToOwnerAndDie(auto& err) {
+ TBase::Send(OwnerId, err->Release().Release());
+ PassAway();
+ }
+
+ void PassAway() {
+ if (ScanActorId) {
+ Send(*ScanActorId, new TEvents::TEvPoison());
+ }
+ TBase::PassAway();
+ }
+
+private:
+ TActorId OwnerId;
+ ui32 ScanId;
+ TTableId TableId;
+ TVector<TSerializedTableRange> Ranges;
+ TVector<NMiniKQL::TKqpComputeContextBase::TColumn> Columns;
+
+ ui64 CurrentRange = 0;
+ TMaybe<TActorId> ScanActorId;
+};
+
+THolder<IActor> CreateSystemViewScan(const TActorId& ownerId, ui32 scanId, const TTableId& tableId,
+ TVector<TSerializedTableRange> ranges, const TArrayRef<NMiniKQL::TKqpComputeContextBase::TColumn>& columns)
+{
+ if (ranges.size() == 1) {
+ return CreateSystemViewScan(ownerId, scanId, tableId, ranges[0].ToTableRange(), columns);
+ } else {
+ return MakeHolder<TSysViewRangesReader>(ownerId, scanId, tableId, ranges, columns);
+ }
+}
+
THolder<IActor> CreateSystemViewScan(const TActorId& ownerId, ui32 scanId, const TTableId& tableId,
const TTableRange& tableRange, const TArrayRef<NMiniKQL::TKqpComputeContextBase::TColumn>& columns)
{
diff --git a/ydb/core/sys_view/scan.h b/ydb/core/sys_view/scan.h
index bfac578bfae..d85e7640efb 100644
--- a/ydb/core/sys_view/scan.h
+++ b/ydb/core/sys_view/scan.h
@@ -6,6 +6,9 @@ namespace NKikimr {
namespace NSysView {
THolder<IActor> CreateSystemViewScan(const TActorId& ownerId, ui32 scanId, const TTableId& tableId,
+ TVector<TSerializedTableRange> ranges, const TArrayRef<NMiniKQL::TKqpComputeContextBase::TColumn>& columns);
+
+THolder<IActor> CreateSystemViewScan(const TActorId& ownerId, ui32 scanId, const TTableId& tableId,
const TTableRange& tableRange, const TArrayRef<NMiniKQL::TKqpComputeContextBase::TColumn>& columns);
} // NSysView