diff options
author | ssmike <ssmike@ydb.tech> | 2022-12-18 16:41:28 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2022-12-18 16:41:28 +0300 |
commit | 226faa452ed596efd80abce593850aa5347a466e (patch) | |
tree | e168378951093fcd06fcf063116141f01d5eb999 | |
parent | 02a8129f6eab05516f817f6ef0cbc292fecd52a1 (diff) | |
download | ydb-226faa452ed596efd80abce593850aa5347a466e.tar.gz |
Apply read limit to readranges DqSource
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy.cpp | 7 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp | 102 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 73 |
4 files changed, 167 insertions, 17 deletions
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp index 27e45a5c70..86a67e5966 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp @@ -74,6 +74,7 @@ public: AddHandler(0, &TCoAsList::Match, HNDL(PropagatePrecomuteScalarRowset<false>)); AddHandler(0, &TCoTake::Match, HNDL(PropagatePrecomuteTake<false>)); AddHandler(0, &TCoFlatMap::Match, HNDL(PropagatePrecomuteFlatmap<false>)); + AddHandler(0, &TDqStage::Match, HNDL(ApplyLimitToReadTableSource)); AddHandler(0, &TCoAggregateCombine::Match, HNDL(ExpandAggregatePhase)); AddHandler(0, &TCoAggregateCombineState::Match, HNDL(ExpandAggregatePhase)); @@ -138,6 +139,12 @@ protected: return output; } + TMaybeNode<TExprBase> ApplyLimitToReadTableSource(TExprBase node, TExprContext& ctx) { + TExprBase output = KqpApplyLimitToReadTableSource(node, ctx, KqpCtx); + DumpAppliedRule("ApplyLimitToReadTableSource", node.Ptr(), output.Ptr(), ctx); + return output; + } + TMaybeNode<TExprBase> ApplyLimitToReadTable(TExprBase node, TExprContext& ctx) { TExprBase output = KqpApplyLimitToReadTable(node, ctx, KqpCtx); DumpAppliedRule("ApplyLimitToReadTable", node.Ptr(), output.Ptr(), ctx); diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp index 36b928d5fe..27c523117d 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp @@ -8,6 +8,108 @@ namespace NKikimr::NKqp::NOpt { using namespace NYql; using namespace NYql::NNodes; +TExprBase KqpApplyLimitToReadTableSource(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { + auto stage = node.Cast<TDqStage>(); + TMaybe<size_t> tableSourceIndex; + for (size_t i = 0; i < stage.Inputs().Size(); ++i) { + auto input = stage.Inputs().Item(i); + if (input.Maybe<TDqSource>() && input.Cast<TDqSource>().Settings().Maybe<TKqpReadRangesSourceSettings>()) { + tableSourceIndex = i; + } + } + if (!tableSourceIndex) { + return node; + } + + auto source = stage.Inputs().Item(*tableSourceIndex).Cast<TDqSource>(); + auto readRangesSource = source.Settings().Cast<TKqpReadRangesSourceSettings>(); + auto settings = TKqpReadTableSettings::Parse(readRangesSource.Settings()); + + if (kqpCtx.IsScanQuery()) { + auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, readRangesSource.Table().Path()); + + if (tableDesc.Metadata->Kind != EKikimrTableKind::Olap) { + return node; + } + } + + if (settings.ItemsLimit) { + return node; // already set? + } + + NYql::TNodeOnNodeOwnedMap replaces; + auto sourceArg = stage.Program().Args().Arg(*tableSourceIndex); + TExprNode::TPtr foundTake; + bool singleConsumer = true; + VisitExpr(stage.Program().Body().Ptr(), + [&](const TExprNode::TPtr& exprPtr) -> bool { + TExprBase expr(exprPtr); + if (expr.Maybe<TDqConnection>() || expr.Maybe<TDqPrecompute>() || expr.Maybe<TDqPhyPrecompute>()) { + return false; + } + if (auto take = expr.Maybe<TCoTake>()) { + auto maybeSkip = take.Input().Maybe<TCoSkip>(); + auto input = (maybeSkip ? maybeSkip.Cast().Input() : take.Input()).Cast(); + if (input.Raw() == sourceArg.Raw()) { + auto ptr = take.Cast().Ptr(); + if (foundTake && foundTake != ptr) { + singleConsumer = false; + } + foundTake = ptr; + } + } + return true; + }); + + if (!singleConsumer || !foundTake) { + return node; + } + + auto take = TCoTake(foundTake); + + auto maybeSkip = take.Input().Maybe<TCoSkip>(); + auto input = maybeSkip ? maybeSkip.Cast().Input() : take.Input(); + + TMaybeNode<TExprBase> limitValue; + auto maybeTakeCount = take.Count().Maybe<TCoUint64>(); + auto maybeSkipCount = maybeSkip.Count().Maybe<TCoUint64>(); + + if (maybeTakeCount && (!maybeSkip || maybeSkipCount)) { + ui64 totalLimit = FromString<ui64>(maybeTakeCount.Cast().Literal().Value()); + + if (maybeSkipCount) { + totalLimit += FromString<ui64>(maybeSkipCount.Cast().Literal().Value()); + } + + limitValue = Build<TCoUint64>(ctx, node.Pos()) + .Literal<TCoAtom>() + .Value(ToString(totalLimit)).Build() + .Done(); + } else { + limitValue = take.Count(); + if (maybeSkip) { + limitValue = Build<TCoPlus>(ctx, node.Pos()) + .Left(limitValue.Cast()) + .Right(maybeSkip.Cast().Count()) + .Done(); + } + } + + YQL_CLOG(TRACE, ProviderKqp) << "-- set limit items value to " << limitValue.Cast().Ref().Dump(); + + if (limitValue.Maybe<TCoUint64>()) { + settings.SetItemsLimit(limitValue.Cast().Ptr()); + } else { + settings.SetItemsLimit(Build<TDqPrecompute>(ctx, node.Pos()) + .Input(limitValue.Cast()) + .Done().Ptr()); + } + replaces[readRangesSource.Settings().Raw()] = settings.BuildNode(ctx, source.Pos()).Ptr(); + + return TExprBase(ctx.ReplaceNodes(node.Ptr(), replaces)); +} + + TExprBase KqpApplyLimitToReadTable(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { if (!node.Maybe<TCoTake>()) { return node; diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h b/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h index 9bbe1a746a..4e0a239bd2 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h @@ -24,6 +24,8 @@ NYql::NNodes::TExprBase KqpBuildStreamLookupTableStages(NYql::NNodes::TExprBase NYql::NNodes::TExprBase KqpRemoveRedundantSortByPk(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx); +NYql::NNodes::TExprBase KqpApplyLimitToReadTableSource(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx); + NYql::NNodes::TExprBase KqpApplyLimitToReadTable(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx); diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index b9e23c882b..0fb4018eb9 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -57,6 +57,13 @@ public: bool NeedResolve = false; + void CopyContinuationToken(TShardState* state) { + if (state->LastKey.DataSize() != 0) { + LastKey = state->LastKey; + } + FirstUnprocessedRequest = state->FirstUnprocessedRequest; + } + TShardState(ui64 tabletId) : TabletId(tabletId) { @@ -215,7 +222,9 @@ public: }; public: - TKqpReadActor(NKikimrTxDataShard::TKqpReadRangesSourceSettings&& settings, const NYql::NDq::TDqAsyncIoFactory::TSourceArguments& args) + TKqpReadActor( + NKikimrTxDataShard::TKqpReadRangesSourceSettings&& settings, + const NYql::NDq::TDqAsyncIoFactory::TSourceArguments& args) : Settings(std::move(settings)) , LogPrefix(TStringBuilder() << "SelfId: " << this->SelfId() << ", TxId: " << args.TxId << ", task: " << args.TaskId << ". ") , ComputeActorId(args.ComputeActorId) @@ -428,6 +437,10 @@ public: auto newShard = MakeHolder<TShardState>(partition.ShardId); + if (idx == 0 && state) { + newShard->CopyContinuationToken(state.Get()); + } + for (ui64 j = i; j < state->Ranges.size(); ++j) { CA_LOG_D("Intersect state range #" << j << " " << DebugPrintRange(KeyColumnTypes, state->Ranges[j].ToTableRange(), tr) << " with partition range " << DebugPrintRange(KeyColumnTypes, partitionRange, tr)); @@ -517,6 +530,17 @@ public: } void StartRead(TShardState* state) { + ui64 limit = 0; + if (Settings.GetItemsLimit()) { + limit = Settings.GetItemsLimit() - Min(Settings.GetItemsLimit(), RecievedRowCount); + } else { + limit = EVREAD_MAX_ROWS; + } + if (limit == 0) { + delete state; + return; + } + THolder<TEvDataShard::TEvRead> ev(new TEvDataShard::TEvRead()); auto& record = ev->Record; @@ -548,11 +572,7 @@ public: record.MutableTableId()->SetSchemaVersion(Settings.GetTable().GetSchemaVersion()); record.SetReverse(Settings.GetReverse()); - if (Settings.GetItemsLimit()) { - record.SetMaxRows(Settings.GetItemsLimit()); - } else { - record.SetMaxRows(EVREAD_MAX_ROWS); - } + record.SetMaxRows(limit); record.SetMaxBytes(EVREAD_MAX_BYTES); record.SetResultFormat(Settings.GetDataFormat()); @@ -575,18 +595,19 @@ public: return; } - Reads[id].SerializedContinuationToken = record.GetContinuationToken(); if (record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) { for (auto& issue : record.GetStatus().GetIssues()) { CA_LOG_D("read id #" << id << " got issue " << issue.Getmessage()); } return RetryRead(id); } + Reads[id].SerializedContinuationToken = record.GetContinuationToken(); Reads[id].RegisterMessage(*ev->Get()); YQL_ENSURE(record.GetResultFormat() == NKikimrTxDataShard::EScanDataFormat::CELLVEC); + RecievedRowCount += ev->Get()->GetRowsCount(); Results.push({Reads[id].Shard->TabletId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release())}); CA_LOG_D(TStringBuilder() << "new data for read #" << id << " pushed"); Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); @@ -732,7 +753,7 @@ public: break; } resultVector.push_back(std::move((*batch)[processedRows])); - RowCount += 1; + ProcessedRowCount += 1; bytes += rowSize.AllocatedBytes; } CA_LOG_D(TStringBuilder() << "returned " << resultVector.size() << " rows"); @@ -743,13 +764,29 @@ public: Reads[id].Reset(); ResetReads++; } else if (!Reads[id].Finished) { - THolder<TEvDataShard::TEvReadAck> request(new TEvDataShard::TEvReadAck()); - request->Record.SetReadId(record.GetReadId()); - request->Record.SetSeqNo(record.GetSeqNo()); - request->Record.SetMaxRows(EVREAD_MAX_ROWS); - request->Record.SetMaxBytes(EVREAD_MAX_BYTES); - Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(request.Release(), state->TabletId, true), - IEventHandle::FlagTrackDelivery); + ui64 limit = 0; + if (Settings.GetItemsLimit()) { + limit = Settings.GetItemsLimit() - Min(Settings.GetItemsLimit(), RecievedRowCount); + } else { + limit = EVREAD_MAX_ROWS; + } + + if (limit > 0) { + THolder<TEvDataShard::TEvReadAck> request(new TEvDataShard::TEvReadAck()); + request->Record.SetReadId(record.GetReadId()); + request->Record.SetSeqNo(record.GetSeqNo()); + request->Record.SetMaxRows(limit); + request->Record.SetMaxBytes(EVREAD_MAX_BYTES); + Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(request.Release(), state->TabletId, true), + IEventHandle::FlagTrackDelivery); + } else { + auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>(); + cancel->Record.SetReadId(id); + Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId), IEventHandle::FlagTrackDelivery); + delete state; + Reads[id].Reset(); + ResetReads++; + } } StartTableScan(); @@ -760,8 +797,9 @@ public: Results.pop(); CA_LOG_D("dropping batch"); - if (RunningReads() == 0 || (Settings.HasItemsLimit() && RowCount >= Settings.GetItemsLimit())) { + if (RunningReads() == 0 || (Settings.HasItemsLimit() && ProcessedRowCount >= Settings.GetItemsLimit())) { finished = true; + break; } } else { break; @@ -799,7 +837,8 @@ private: TVector<NScheme::TTypeInfo> KeyColumnTypes; - size_t RowCount = 0; + size_t RecievedRowCount = 0; + size_t ProcessedRowCount = 0; ui64 ResetReads = 0; ui64 ReadId = 0; TVector<TReadState> Reads; |