aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2023-02-07 18:22:08 +0300
committerssmike <ssmike@ydb.tech>2023-02-07 18:22:08 +0300
commited20e4a3eae1e67598ba20af409d537f96f51c4f (patch)
tree9270bc11cef84614283a24f2b528f7e190aec16c
parent3cb077bd706fad6f1f1f9bb5b91a81666b9fd705 (diff)
downloadydb-ed20e4a3eae1e67598ba20af409d537f96f51c4f.tar.gz
Fix limits
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp86
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.h8
-rw-r--r--ydb/core/kqp/ut/opt/kqp_ne_ut.cpp33
3 files changed, 102 insertions, 25 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index f1b77ca372f..80fe6d7a06e 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -18,9 +18,6 @@
namespace {
-static constexpr ui64 EVREAD_MAX_ROWS = 32767;
-static constexpr ui64 EVREAD_MAX_BYTES = 200_MB;
-
static constexpr ui64 MAX_SHARD_RETRIES = 5;
static constexpr ui64 MAX_SHARD_RESOLVES = 3;
@@ -29,6 +26,38 @@ bool IsDebugLogEnabled(const NActors::TActorSystem* actorSystem, NActors::NLog::
return settings && settings->Satisfies(NActors::NLog::EPriority::PRI_DEBUG, component);
}
+struct TDefaultRangeEvReadSettings {
+ NKikimrTxDataShard::TEvRead Data;
+
+ TDefaultRangeEvReadSettings() {
+ Data.SetMaxRows(32767);
+ Data.SetMaxBytes(200_MB);
+ }
+
+} DefaultRangeEvReadSettings;
+
+THolder<NKikimr::TEvDataShard::TEvRead> DefaultReadSettings() {
+ auto result = MakeHolder<NKikimr::TEvDataShard::TEvRead>();
+ result->Record.MergeFrom(DefaultRangeEvReadSettings.Data);
+ return result;
+}
+
+struct TDefaultRangeEvReadAckSettings {
+ NKikimrTxDataShard::TEvReadAck Data;
+
+ TDefaultRangeEvReadAckSettings() {
+ Data.SetMaxRows(32767);
+ Data.SetMaxBytes(200_MB);
+ }
+
+} DefaultRangeEvReadAckSettings;
+
+THolder<NKikimr::TEvDataShard::TEvReadAck> DefaultAckSettings() {
+ auto result = MakeHolder<NKikimr::TEvDataShard::TEvReadAck>();
+ result->Record.MergeFrom(DefaultRangeEvReadAckSettings.Data);
+ return result;
+}
+
}
@@ -40,7 +69,6 @@ using namespace NYql::NDq;
using namespace NKikimr;
using namespace NKikimr::NDataShard;
-
class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq::IDqComputeActorAsyncInput {
using TBase = TActorBootstrapped<TKqpReadActor>;
public:
@@ -261,7 +289,7 @@ public:
NKikimrTxDataShard::TKqpReadRangesSourceSettings&& settings,
const NYql::NDq::TDqAsyncIoFactory::TSourceArguments& args)
: Settings(std::move(settings))
- , LogPrefix(TStringBuilder() << "TxId: " << args.TxId << ", task: " << args.TaskId << ", CA Id" << args.ComputeActorId << ". ")
+ , LogPrefix(TStringBuilder() << "TxId: " << args.TxId << ", task: " << args.TaskId << ", CA Id " << args.ComputeActorId << ". ")
, ComputeActorId(args.ComputeActorId)
, InputIndex(args.InputIndex)
, TypeEnv(args.TypeEnv)
@@ -584,17 +612,16 @@ public:
}
void StartRead(TShardState* state) {
- ui64 limit = 0;
+ TMaybe<ui64> limit;
if (Settings.GetItemsLimit()) {
limit = Settings.GetItemsLimit() - Min(Settings.GetItemsLimit(), RecievedRowCount);
- } else {
- limit = EVREAD_MAX_ROWS;
- }
- if (limit == 0) {
- return;
+
+ if (*limit == 0) {
+ return;
+ }
}
- THolder<TEvDataShard::TEvRead> ev(new TEvDataShard::TEvRead());
+ auto ev = ::DefaultReadSettings();
auto& record = ev->Record;
state->FillEvRead(*ev, KeyColumnTypes, Settings.GetReverse());
@@ -627,8 +654,9 @@ public:
record.MutableTableId()->SetSchemaVersion(Settings.GetTable().GetSchemaVersion());
record.SetReverse(Settings.GetReverse());
- record.SetMaxRows(limit);
- record.SetMaxBytes(EVREAD_MAX_BYTES);
+ if (limit) {
+ record.SetMaxRows(*limit);
+ }
record.SetResultFormat(Settings.GetDataFormat());
@@ -835,6 +863,10 @@ public:
resultVector.push_back(std::move((*batch)[result.ProcessedRows]));
ProcessedRowCount += 1;
bytes += rowSize.AllocatedBytes;
+ if (ProcessedRowCount == Settings.GetItemsLimit()) {
+ finished = true;
+ return bytes;
+ }
}
CA_LOG_D(TStringBuilder() << "returned " << resultVector.size() << " rows");
@@ -844,19 +876,18 @@ public:
Reads[id].Reset();
ResetReads++;
} else if (!Reads[id].Finished) {
- ui64 limit = 0;
+ TMaybe<ui64> limit;
if (Settings.GetItemsLimit()) {
limit = Settings.GetItemsLimit() - Min(Settings.GetItemsLimit(), RecievedRowCount);
- } else {
- limit = EVREAD_MAX_ROWS;
}
- if (limit > 0) {
- THolder<TEvDataShard::TEvReadAck> request(new TEvDataShard::TEvReadAck());
+ if (!limit || *limit > 0) {
+ auto request = ::DefaultAckSettings();
request->Record.SetReadId(record.GetReadId());
request->Record.SetSeqNo(record.GetSeqNo());
- request->Record.SetMaxRows(limit);
- request->Record.SetMaxBytes(EVREAD_MAX_BYTES);
+ if (limit) {
+ request->Record.SetMaxRows(*limit);
+ }
Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(request.Release(), state->TabletId, true),
IEventHandle::FlagTrackDelivery);
} else {
@@ -869,14 +900,11 @@ public:
}
StartTableScan();
- if (PendingShards.Size() > 0) {
- return bytes;
- }
Results.pop();
CA_LOG_D("dropping batch");
- if (RunningReads() == 0 || (Settings.HasItemsLimit() && ProcessedRowCount >= Settings.GetItemsLimit())) {
+ if (RunningReads() == 0 || (Settings.GetItemsLimit() && ProcessedRowCount >= Settings.GetItemsLimit())) {
finished = true;
break;
}
@@ -1003,5 +1031,13 @@ void RegisterKqpReadActor(NYql::NDq::TDqAsyncIoFactory& factory) {
});
}
+void InjectRangeEvReadSettings(const NKikimrTxDataShard::TEvRead& read) {
+ ::DefaultRangeEvReadSettings.Data.MergeFrom(read);
+}
+
+void InjectRangeEvReadAckSettings(const NKikimrTxDataShard::TEvReadAck& ack) {
+ ::DefaultRangeEvReadAckSettings.Data.MergeFrom(ack);
+}
+
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.h b/ydb/core/kqp/runtime/kqp_read_actor.h
index 28f3e17873c..e543ec456e1 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.h
+++ b/ydb/core/kqp/runtime/kqp_read_actor.h
@@ -2,10 +2,18 @@
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
+namespace NKikimrTxDataShard {
+class TEvRead;
+class TEvReadAck;
+}
+
namespace NKikimr {
namespace NKqp {
void RegisterKqpReadActor(NYql::NDq::TDqAsyncIoFactory& factory);
+void InjectRangeEvReadSettings(const NKikimrTxDataShard::TEvRead&);
+void InjectRangeEvReadAckSettings(const NKikimrTxDataShard::TEvReadAck&);
+
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
index 1b17b842495..5ccc439b1e2 100644
--- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
@@ -1,6 +1,7 @@
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
+#include <ydb/core/kqp/runtime/kqp_read_actor.h>
namespace NKikimr::NKqp {
@@ -3447,6 +3448,38 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
+ Y_UNIT_TEST(DqSourceCount) {
+ TKikimrSettings settings;
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true);
+ settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
+ TFeatureFlags flags;
+ flags.SetEnablePredicateExtractForDataQueries(true);
+ settings.SetFeatureFlags(flags);
+ settings.SetAppConfig(appConfig);
+
+ TKikimrRunner kikimr(settings);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ NKikimrTxDataShard::TEvRead evread;
+ evread.SetMaxRowsInResult(1);
+ evread.SetMaxRows(2);
+ InjectRangeEvReadSettings(evread);
+
+ NKikimrTxDataShard::TEvReadAck evreadack;
+ evreadack.SetMaxRows(2);
+ InjectRangeEvReadAckSettings(evreadack);
+
+ {
+ auto result = session.ExecuteDataQuery(R"(
+ SELECT COUNT(*) FROM `/Root/EightShard`;
+ )", TTxControl::BeginTx().CommitTx()).GetValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ CompareYson(R"([[24u]])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ }
+
Y_UNIT_TEST(DqSource) {
TKikimrSettings settings;
NKikimrConfig::TAppConfig appConfig;