diff options
author | Mikhail Surin <surinmike@gmail.com> | 2022-06-06 20:16:58 +0300 |
---|---|---|
committer | Mikhail Surin <surinmike@gmail.com> | 2022-06-06 20:16:58 +0300 |
commit | ef94dfade53319dc390170eb18e6322160973a55 (patch) | |
tree | 6b3025a2abbc27ca3c12a3f2618a900180aa129e | |
parent | 6b166539157ef1475a849a381d15c4a1ca582b66 (diff) | |
download | ydb-ef94dfade53319dc390170eb18e6322160973a55.tar.gz |
Support multiple ranges in SystemViewScan
ref:d014de967e9320e73698f51c9c0448a68ae2f828
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp | 23 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_sys_view_ut.cpp | 69 | ||||
-rw-r--r-- | ydb/core/sys_view/scan.cpp | 134 | ||||
-rw-r--r-- | ydb/core/sys_view/scan.h | 3 |
4 files changed, 210 insertions, 19 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp index dd24f15d395..7c4a597ce0c 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp @@ -89,9 +89,8 @@ public: } TSmallVec<NMiniKQL::TKqpScanComputeContext::TColumn> columns; - TSerializedTableRange* rangeRef = nullptr; - TSerializedTableRange coveringRange; + TVector<TSerializedTableRange> ranges; if (Meta) { YQL_ENSURE(ComputeCtx.GetTableScans().empty()); @@ -107,30 +106,16 @@ public: } const auto& protoRanges = Meta->GetReads()[0].GetKeyRanges(); - if (protoRanges.size() == 1) { - coveringRange.Load(protoRanges[0]); - if (!protoRanges[0].HasTo()) { - coveringRange.To = coveringRange.From; - coveringRange.FromInclusive = coveringRange.ToInclusive = true; - } - } else { - // TODO: FIX IT, verify here? - auto from = TSerializedTableRange(*protoRanges.begin()); - auto to = TSerializedTableRange(*protoRanges.rbegin()); - coveringRange.From = from.From; - coveringRange.FromInclusive = from.FromInclusive; - coveringRange.To = protoRanges.rbegin()->HasTo() ? to.To : to.From; - coveringRange.ToInclusive = protoRanges.rbegin()->HasTo() ? to.ToInclusive : true; + for (auto& range : protoRanges) { + ranges.emplace_back(range); } - - rangeRef = &coveringRange; } if (ScanData) { ScanData->TaskId = GetTask().GetId(); ScanData->TableReader = CreateKqpTableReader(*ScanData); - auto scanActor = NSysView::CreateSystemViewScan(SelfId(), 0, ScanData->TableId, rangeRef->ToTableRange(), columns); + auto scanActor = NSysView::CreateSystemViewScan(SelfId(), 0, ScanData->TableId, ranges, columns); if (!scanActor) { InternalError(TIssuesIds::DEFAULT_ERROR, TStringBuilder() diff --git a/ydb/core/kqp/ut/kqp_sys_view_ut.cpp b/ydb/core/kqp/ut/kqp_sys_view_ut.cpp index 1f8988517c4..23a0b6ca2a5 100644 --- a/ydb/core/kqp/ut/kqp_sys_view_ut.cpp +++ b/ydb/core/kqp/ut/kqp_sys_view_ut.cpp @@ -90,6 +90,75 @@ Y_UNIT_TEST_SUITE(KqpSystemView) { ])", StreamResultToYson(it)); } + Y_UNIT_TEST(PartitionStatsRanges) { + TKikimrRunner kikimr; + auto client = kikimr.GetTableClient(); + + auto it = client.StreamExecuteScanQuery(R"( + PRAGMA Kikimr.OptEnablePredicateExtract = "true"; + SELECT OwnerId, PartIdx, Path, PathId + FROM `/Root/.sys/partition_stats` + WHERE + OwnerId = 72057594046644480ul + AND PathId = 5u + AND (PartIdx BETWEEN 0 AND 2 OR PartIdx BETWEEN 6 AND 9) + ORDER BY PathId, PartIdx; + )").GetValueSync(); + + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + + CompareYson(R"([ + [[72057594046644480u];[0u];["/Root/BatchUpload"];[5u]]; + [[72057594046644480u];[1u];["/Root/BatchUpload"];[5u]]; + [[72057594046644480u];[2u];["/Root/BatchUpload"];[5u]]; + [[72057594046644480u];[6u];["/Root/BatchUpload"];[5u]]; + [[72057594046644480u];[7u];["/Root/BatchUpload"];[5u]]; + [[72057594046644480u];[8u];["/Root/BatchUpload"];[5u]]; + [[72057594046644480u];[9u];["/Root/BatchUpload"];[5u]]; + ])", StreamResultToYson(it)); + } + + Y_UNIT_TEST(PartitionStatsParametricRanges) { + TKikimrRunner kikimr; + auto client = kikimr.GetTableClient(); + + auto paramsBuilder = client.GetParamsBuilder(); + auto params = paramsBuilder + .AddParam("$l1").Int32(0).Build() + .AddParam("$r1").Int32(2).Build() + .AddParam("$l2").Int32(6).Build() + .AddParam("$r2").Int32(9).Build().Build(); + + auto it = client.StreamExecuteScanQuery(R"( + DECLARE $l1 AS Int32; + DECLARE $r1 AS Int32; + + DECLARE $l2 AS Int32; + DECLARE $r2 AS Int32; + + PRAGMA Kikimr.OptEnablePredicateExtract = "true"; + SELECT OwnerId, PartIdx, Path, PathId + FROM `/Root/.sys/partition_stats` + WHERE + OwnerId = 72057594046644480ul + AND PathId = 5u + AND (PartIdx BETWEEN $l1 AND $r1 OR PartIdx BETWEEN $l2 AND $r2) + ORDER BY PathId, PartIdx; + )", params).GetValueSync(); + + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + + CompareYson(R"([ + [[72057594046644480u];[0u];["/Root/BatchUpload"];[5u]]; + [[72057594046644480u];[1u];["/Root/BatchUpload"];[5u]]; + [[72057594046644480u];[2u];["/Root/BatchUpload"];[5u]]; + [[72057594046644480u];[6u];["/Root/BatchUpload"];[5u]]; + [[72057594046644480u];[7u];["/Root/BatchUpload"];[5u]]; + [[72057594046644480u];[8u];["/Root/BatchUpload"];[5u]]; + [[72057594046644480u];[9u];["/Root/BatchUpload"];[5u]]; + ])", StreamResultToYson(it)); + } + Y_UNIT_TEST(PartitionStatsRange1) { TKikimrRunner kikimr; auto client = kikimr.GetTableClient(); diff --git a/ydb/core/sys_view/scan.cpp b/ydb/core/sys_view/scan.cpp index f337cb6ba77..7cdb56bc6fe 100644 --- a/ydb/core/sys_view/scan.cpp +++ b/ydb/core/sys_view/scan.cpp @@ -1,5 +1,7 @@ #include "scan.h" +#include <ydb/core/kqp/kqp_compute.h> + #include <ydb/core/sys_view/common/schema.h> #include <ydb/core/sys_view/partition_stats/partition_stats.h> #include <ydb/core/sys_view/nodes/nodes.h> @@ -12,9 +14,141 @@ #include <ydb/core/sys_view/storage/storage_stats.h> #include <ydb/core/sys_view/tablets/tablets.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/interconnect.h> +#include <library/cpp/actors/core/log.h> + namespace NKikimr { namespace NSysView { +class TSysViewRangesReader : public TActor<TSysViewRangesReader> { +public: + using TBase = TActor<TSysViewRangesReader>; + + TSysViewRangesReader( + const TActorId& ownerId, + ui32 scanId, + const TTableId& tableId, + TVector<TSerializedTableRange> ranges, + const TArrayRef<NMiniKQL::TKqpComputeContextBase::TColumn>& columns) + : TBase(&TSysViewRangesReader::ScanState) + , OwnerId(ownerId) + , ScanId(scanId) + , TableId(tableId) + , Ranges(std::move(ranges)) + , Columns(columns.begin(), columns.end()) + { + } + + static constexpr auto ActorActivityType() { + return NKikimrServices::TActivity::KQP_SYSTEM_VIEW_SCAN; + } + + STFUNC(ScanState) { + switch (ev->GetTypeRewrite()) { + hFunc(NKqp::TEvKqpCompute::TEvScanDataAck, Handle); + hFunc(NKikimr::NKqp::TEvKqpCompute::TEvScanData, Handle); + hFunc(NKikimr::NKqp::TEvKqpCompute::TEvScanError, ResendToOwnerAndDie); + hFunc(NKqp::TEvKqp::TEvAbortExecution, HandleAbortExecution); + cFunc(TEvents::TEvPoison::EventType, PassAway); + hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, ResendToOwnerAndDie); + hFunc(TEvents::TEvUndelivered, ResendToOwnerAndDie); + hFunc(NKqp::TEvKqpCompute::TEvScanInitActor, Handle); + default: + LOG_CRIT(ctx, NKikimrServices::SYSTEM_VIEWS, + "NSysView: unexpected event 0x%08" PRIx32, ev->GetTypeRewrite()); + } + } + + void Handle(NKqp::TEvKqpCompute::TEvScanInitActor::TPtr& msg) { + ActorIdToProto(SelfId(), msg->Get()->Record.MutableScanActorId()); + Send(OwnerId, msg->Release().Release()); + } + + void Handle(NKqp::TEvKqpCompute::TEvScanDataAck::TPtr& ack) { + Y_VERIFY_DEBUG(ack->Sender == OwnerId); + if (!ScanActorId) { + if (CurrentRange < Ranges.size()) { + auto actor = CreateSystemViewScan( + SelfId(), ScanId, TableId, Ranges[CurrentRange].ToTableRange(), + Columns); + ScanActorId = Register(actor.Release()); + CurrentRange += 1; + } else { + PassAway(); + return; + } + } + + Send(*ScanActorId, new NKqp::TEvKqpCompute::TEvScanDataAck(ack->Get()->FreeSpace, ack->Get()->Generation)); + } + + void Handle(NKqp::TEvKqpCompute::TEvScanData::TPtr& data) { + bool& finished = (*data->Get()).Finished; + if (finished) { + if (CurrentRange != Ranges.size()) { + finished = false; + ScanActorId.Clear(); + } else { + TBase::Send(OwnerId, THolder(data->Release().Release())); + PassAway(); + return; + } + } + + TBase::Send(OwnerId, THolder(data->Release().Release())); + } + + void HandleAbortExecution(NKqp::TEvKqp::TEvAbortExecution::TPtr& ev) { + LOG_ERROR_S(TlsActivationContext->AsActorContext(), NKikimrServices::SYSTEM_VIEWS, + "Got abort execution event, actor: " << TBase::SelfId() + << ", owner: " << OwnerId + << ", scan id: " << ScanId + << ", table id: " << TableId + << ", code: " << NYql::NDqProto::StatusIds::StatusCode_Name(ev->Get()->Record.GetStatusCode()) + << ", error: " << ev->Get()->GetIssues().ToOneLineString()); + + if (ScanActorId) { + Send(*ScanActorId, THolder(ev->Release().Release())); + } + + PassAway(); + } + + void ResendToOwnerAndDie(auto& err) { + TBase::Send(OwnerId, err->Release().Release()); + PassAway(); + } + + void PassAway() { + if (ScanActorId) { + Send(*ScanActorId, new TEvents::TEvPoison()); + } + TBase::PassAway(); + } + +private: + TActorId OwnerId; + ui32 ScanId; + TTableId TableId; + TVector<TSerializedTableRange> Ranges; + TVector<NMiniKQL::TKqpComputeContextBase::TColumn> Columns; + + ui64 CurrentRange = 0; + TMaybe<TActorId> ScanActorId; +}; + +THolder<IActor> CreateSystemViewScan(const TActorId& ownerId, ui32 scanId, const TTableId& tableId, + TVector<TSerializedTableRange> ranges, const TArrayRef<NMiniKQL::TKqpComputeContextBase::TColumn>& columns) +{ + if (ranges.size() == 1) { + return CreateSystemViewScan(ownerId, scanId, tableId, ranges[0].ToTableRange(), columns); + } else { + return MakeHolder<TSysViewRangesReader>(ownerId, scanId, tableId, ranges, columns); + } +} + THolder<IActor> CreateSystemViewScan(const TActorId& ownerId, ui32 scanId, const TTableId& tableId, const TTableRange& tableRange, const TArrayRef<NMiniKQL::TKqpComputeContextBase::TColumn>& columns) { diff --git a/ydb/core/sys_view/scan.h b/ydb/core/sys_view/scan.h index bfac578bfae..d85e7640efb 100644 --- a/ydb/core/sys_view/scan.h +++ b/ydb/core/sys_view/scan.h @@ -6,6 +6,9 @@ namespace NKikimr { namespace NSysView { THolder<IActor> CreateSystemViewScan(const TActorId& ownerId, ui32 scanId, const TTableId& tableId, + TVector<TSerializedTableRange> ranges, const TArrayRef<NMiniKQL::TKqpComputeContextBase::TColumn>& columns); + +THolder<IActor> CreateSystemViewScan(const TActorId& ownerId, ui32 scanId, const TTableId& tableId, const TTableRange& tableRange, const TArrayRef<NMiniKQL::TKqpComputeContextBase::TColumn>& columns); } // NSysView |