aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2022-12-18 16:41:28 +0300
committerssmike <ssmike@ydb.tech>2022-12-18 16:41:28 +0300
commit226faa452ed596efd80abce593850aa5347a466e (patch)
treee168378951093fcd06fcf063116141f01d5eb999
parent02a8129f6eab05516f817f6ef0cbc292fecd52a1 (diff)
downloadydb-226faa452ed596efd80abce593850aa5347a466e.tar.gz
Apply read limit to readranges DqSource
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy.cpp7
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp102
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h2
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp73
4 files changed, 167 insertions, 17 deletions
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
index 27e45a5c70..86a67e5966 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
@@ -74,6 +74,7 @@ public:
AddHandler(0, &TCoAsList::Match, HNDL(PropagatePrecomuteScalarRowset<false>));
AddHandler(0, &TCoTake::Match, HNDL(PropagatePrecomuteTake<false>));
AddHandler(0, &TCoFlatMap::Match, HNDL(PropagatePrecomuteFlatmap<false>));
+ AddHandler(0, &TDqStage::Match, HNDL(ApplyLimitToReadTableSource));
AddHandler(0, &TCoAggregateCombine::Match, HNDL(ExpandAggregatePhase));
AddHandler(0, &TCoAggregateCombineState::Match, HNDL(ExpandAggregatePhase));
@@ -138,6 +139,12 @@ protected:
return output;
}
+ TMaybeNode<TExprBase> ApplyLimitToReadTableSource(TExprBase node, TExprContext& ctx) {
+ TExprBase output = KqpApplyLimitToReadTableSource(node, ctx, KqpCtx);
+ DumpAppliedRule("ApplyLimitToReadTableSource", node.Ptr(), output.Ptr(), ctx);
+ return output;
+ }
+
TMaybeNode<TExprBase> ApplyLimitToReadTable(TExprBase node, TExprContext& ctx) {
TExprBase output = KqpApplyLimitToReadTable(node, ctx, KqpCtx);
DumpAppliedRule("ApplyLimitToReadTable", node.Ptr(), output.Ptr(), ctx);
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp
index 36b928d5fe..27c523117d 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp
@@ -8,6 +8,108 @@ namespace NKikimr::NKqp::NOpt {
using namespace NYql;
using namespace NYql::NNodes;
+TExprBase KqpApplyLimitToReadTableSource(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
+ auto stage = node.Cast<TDqStage>();
+ TMaybe<size_t> tableSourceIndex;
+ for (size_t i = 0; i < stage.Inputs().Size(); ++i) {
+ auto input = stage.Inputs().Item(i);
+ if (input.Maybe<TDqSource>() && input.Cast<TDqSource>().Settings().Maybe<TKqpReadRangesSourceSettings>()) {
+ tableSourceIndex = i;
+ }
+ }
+ if (!tableSourceIndex) {
+ return node;
+ }
+
+ auto source = stage.Inputs().Item(*tableSourceIndex).Cast<TDqSource>();
+ auto readRangesSource = source.Settings().Cast<TKqpReadRangesSourceSettings>();
+ auto settings = TKqpReadTableSettings::Parse(readRangesSource.Settings());
+
+ if (kqpCtx.IsScanQuery()) {
+ auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, readRangesSource.Table().Path());
+
+ if (tableDesc.Metadata->Kind != EKikimrTableKind::Olap) {
+ return node;
+ }
+ }
+
+ if (settings.ItemsLimit) {
+ return node; // already set?
+ }
+
+ NYql::TNodeOnNodeOwnedMap replaces;
+ auto sourceArg = stage.Program().Args().Arg(*tableSourceIndex);
+ TExprNode::TPtr foundTake;
+ bool singleConsumer = true;
+ VisitExpr(stage.Program().Body().Ptr(),
+ [&](const TExprNode::TPtr& exprPtr) -> bool {
+ TExprBase expr(exprPtr);
+ if (expr.Maybe<TDqConnection>() || expr.Maybe<TDqPrecompute>() || expr.Maybe<TDqPhyPrecompute>()) {
+ return false;
+ }
+ if (auto take = expr.Maybe<TCoTake>()) {
+ auto maybeSkip = take.Input().Maybe<TCoSkip>();
+ auto input = (maybeSkip ? maybeSkip.Cast().Input() : take.Input()).Cast();
+ if (input.Raw() == sourceArg.Raw()) {
+ auto ptr = take.Cast().Ptr();
+ if (foundTake && foundTake != ptr) {
+ singleConsumer = false;
+ }
+ foundTake = ptr;
+ }
+ }
+ return true;
+ });
+
+ if (!singleConsumer || !foundTake) {
+ return node;
+ }
+
+ auto take = TCoTake(foundTake);
+
+ auto maybeSkip = take.Input().Maybe<TCoSkip>();
+ auto input = maybeSkip ? maybeSkip.Cast().Input() : take.Input();
+
+ TMaybeNode<TExprBase> limitValue;
+ auto maybeTakeCount = take.Count().Maybe<TCoUint64>();
+ auto maybeSkipCount = maybeSkip.Count().Maybe<TCoUint64>();
+
+ if (maybeTakeCount && (!maybeSkip || maybeSkipCount)) {
+ ui64 totalLimit = FromString<ui64>(maybeTakeCount.Cast().Literal().Value());
+
+ if (maybeSkipCount) {
+ totalLimit += FromString<ui64>(maybeSkipCount.Cast().Literal().Value());
+ }
+
+ limitValue = Build<TCoUint64>(ctx, node.Pos())
+ .Literal<TCoAtom>()
+ .Value(ToString(totalLimit)).Build()
+ .Done();
+ } else {
+ limitValue = take.Count();
+ if (maybeSkip) {
+ limitValue = Build<TCoPlus>(ctx, node.Pos())
+ .Left(limitValue.Cast())
+ .Right(maybeSkip.Cast().Count())
+ .Done();
+ }
+ }
+
+ YQL_CLOG(TRACE, ProviderKqp) << "-- set limit items value to " << limitValue.Cast().Ref().Dump();
+
+ if (limitValue.Maybe<TCoUint64>()) {
+ settings.SetItemsLimit(limitValue.Cast().Ptr());
+ } else {
+ settings.SetItemsLimit(Build<TDqPrecompute>(ctx, node.Pos())
+ .Input(limitValue.Cast())
+ .Done().Ptr());
+ }
+ replaces[readRangesSource.Settings().Raw()] = settings.BuildNode(ctx, source.Pos()).Ptr();
+
+ return TExprBase(ctx.ReplaceNodes(node.Ptr(), replaces));
+}
+
+
TExprBase KqpApplyLimitToReadTable(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
if (!node.Maybe<TCoTake>()) {
return node;
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h b/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h
index 9bbe1a746a..4e0a239bd2 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h
@@ -24,6 +24,8 @@ NYql::NNodes::TExprBase KqpBuildStreamLookupTableStages(NYql::NNodes::TExprBase
NYql::NNodes::TExprBase KqpRemoveRedundantSortByPk(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx);
+NYql::NNodes::TExprBase KqpApplyLimitToReadTableSource(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx);
+
NYql::NNodes::TExprBase KqpApplyLimitToReadTable(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx);
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index b9e23c882b..0fb4018eb9 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -57,6 +57,13 @@ public:
bool NeedResolve = false;
+ void CopyContinuationToken(TShardState* state) {
+ if (state->LastKey.DataSize() != 0) {
+ LastKey = state->LastKey;
+ }
+ FirstUnprocessedRequest = state->FirstUnprocessedRequest;
+ }
+
TShardState(ui64 tabletId)
: TabletId(tabletId)
{
@@ -215,7 +222,9 @@ public:
};
public:
- TKqpReadActor(NKikimrTxDataShard::TKqpReadRangesSourceSettings&& settings, const NYql::NDq::TDqAsyncIoFactory::TSourceArguments& args)
+ TKqpReadActor(
+ NKikimrTxDataShard::TKqpReadRangesSourceSettings&& settings,
+ const NYql::NDq::TDqAsyncIoFactory::TSourceArguments& args)
: Settings(std::move(settings))
, LogPrefix(TStringBuilder() << "SelfId: " << this->SelfId() << ", TxId: " << args.TxId << ", task: " << args.TaskId << ". ")
, ComputeActorId(args.ComputeActorId)
@@ -428,6 +437,10 @@ public:
auto newShard = MakeHolder<TShardState>(partition.ShardId);
+ if (idx == 0 && state) {
+ newShard->CopyContinuationToken(state.Get());
+ }
+
for (ui64 j = i; j < state->Ranges.size(); ++j) {
CA_LOG_D("Intersect state range #" << j << " " << DebugPrintRange(KeyColumnTypes, state->Ranges[j].ToTableRange(), tr)
<< " with partition range " << DebugPrintRange(KeyColumnTypes, partitionRange, tr));
@@ -517,6 +530,17 @@ public:
}
void StartRead(TShardState* state) {
+ ui64 limit = 0;
+ if (Settings.GetItemsLimit()) {
+ limit = Settings.GetItemsLimit() - Min(Settings.GetItemsLimit(), RecievedRowCount);
+ } else {
+ limit = EVREAD_MAX_ROWS;
+ }
+ if (limit == 0) {
+ delete state;
+ return;
+ }
+
THolder<TEvDataShard::TEvRead> ev(new TEvDataShard::TEvRead());
auto& record = ev->Record;
@@ -548,11 +572,7 @@ public:
record.MutableTableId()->SetSchemaVersion(Settings.GetTable().GetSchemaVersion());
record.SetReverse(Settings.GetReverse());
- if (Settings.GetItemsLimit()) {
- record.SetMaxRows(Settings.GetItemsLimit());
- } else {
- record.SetMaxRows(EVREAD_MAX_ROWS);
- }
+ record.SetMaxRows(limit);
record.SetMaxBytes(EVREAD_MAX_BYTES);
record.SetResultFormat(Settings.GetDataFormat());
@@ -575,18 +595,19 @@ public:
return;
}
- Reads[id].SerializedContinuationToken = record.GetContinuationToken();
if (record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) {
for (auto& issue : record.GetStatus().GetIssues()) {
CA_LOG_D("read id #" << id << " got issue " << issue.Getmessage());
}
return RetryRead(id);
}
+ Reads[id].SerializedContinuationToken = record.GetContinuationToken();
Reads[id].RegisterMessage(*ev->Get());
YQL_ENSURE(record.GetResultFormat() == NKikimrTxDataShard::EScanDataFormat::CELLVEC);
+ RecievedRowCount += ev->Get()->GetRowsCount();
Results.push({Reads[id].Shard->TabletId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release())});
CA_LOG_D(TStringBuilder() << "new data for read #" << id << " pushed");
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
@@ -732,7 +753,7 @@ public:
break;
}
resultVector.push_back(std::move((*batch)[processedRows]));
- RowCount += 1;
+ ProcessedRowCount += 1;
bytes += rowSize.AllocatedBytes;
}
CA_LOG_D(TStringBuilder() << "returned " << resultVector.size() << " rows");
@@ -743,13 +764,29 @@ public:
Reads[id].Reset();
ResetReads++;
} else if (!Reads[id].Finished) {
- THolder<TEvDataShard::TEvReadAck> request(new TEvDataShard::TEvReadAck());
- request->Record.SetReadId(record.GetReadId());
- request->Record.SetSeqNo(record.GetSeqNo());
- request->Record.SetMaxRows(EVREAD_MAX_ROWS);
- request->Record.SetMaxBytes(EVREAD_MAX_BYTES);
- Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(request.Release(), state->TabletId, true),
- IEventHandle::FlagTrackDelivery);
+ ui64 limit = 0;
+ if (Settings.GetItemsLimit()) {
+ limit = Settings.GetItemsLimit() - Min(Settings.GetItemsLimit(), RecievedRowCount);
+ } else {
+ limit = EVREAD_MAX_ROWS;
+ }
+
+ if (limit > 0) {
+ THolder<TEvDataShard::TEvReadAck> request(new TEvDataShard::TEvReadAck());
+ request->Record.SetReadId(record.GetReadId());
+ request->Record.SetSeqNo(record.GetSeqNo());
+ request->Record.SetMaxRows(limit);
+ request->Record.SetMaxBytes(EVREAD_MAX_BYTES);
+ Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(request.Release(), state->TabletId, true),
+ IEventHandle::FlagTrackDelivery);
+ } else {
+ auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>();
+ cancel->Record.SetReadId(id);
+ Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId), IEventHandle::FlagTrackDelivery);
+ delete state;
+ Reads[id].Reset();
+ ResetReads++;
+ }
}
StartTableScan();
@@ -760,8 +797,9 @@ public:
Results.pop();
CA_LOG_D("dropping batch");
- if (RunningReads() == 0 || (Settings.HasItemsLimit() && RowCount >= Settings.GetItemsLimit())) {
+ if (RunningReads() == 0 || (Settings.HasItemsLimit() && ProcessedRowCount >= Settings.GetItemsLimit())) {
finished = true;
+ break;
}
} else {
break;
@@ -799,7 +837,8 @@ private:
TVector<NScheme::TTypeInfo> KeyColumnTypes;
- size_t RowCount = 0;
+ size_t RecievedRowCount = 0;
+ size_t ProcessedRowCount = 0;
ui64 ResetReads = 0;
ui64 ReadId = 0;
TVector<TReadState> Reads;