diff options
author | ssmike <ssmike@ydb.tech> | 2023-02-07 18:22:08 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2023-02-07 18:22:08 +0300 |
commit | ed20e4a3eae1e67598ba20af409d537f96f51c4f (patch) | |
tree | 9270bc11cef84614283a24f2b528f7e190aec16c | |
parent | 3cb077bd706fad6f1f1f9bb5b91a81666b9fd705 (diff) | |
download | ydb-ed20e4a3eae1e67598ba20af409d537f96f51c4f.tar.gz |
Fix limits
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 86 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.h | 8 | ||||
-rw-r--r-- | ydb/core/kqp/ut/opt/kqp_ne_ut.cpp | 33 |
3 files changed, 102 insertions, 25 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index f1b77ca372f..80fe6d7a06e 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -18,9 +18,6 @@ namespace { -static constexpr ui64 EVREAD_MAX_ROWS = 32767; -static constexpr ui64 EVREAD_MAX_BYTES = 200_MB; - static constexpr ui64 MAX_SHARD_RETRIES = 5; static constexpr ui64 MAX_SHARD_RESOLVES = 3; @@ -29,6 +26,38 @@ bool IsDebugLogEnabled(const NActors::TActorSystem* actorSystem, NActors::NLog:: return settings && settings->Satisfies(NActors::NLog::EPriority::PRI_DEBUG, component); } +struct TDefaultRangeEvReadSettings { + NKikimrTxDataShard::TEvRead Data; + + TDefaultRangeEvReadSettings() { + Data.SetMaxRows(32767); + Data.SetMaxBytes(200_MB); + } + +} DefaultRangeEvReadSettings; + +THolder<NKikimr::TEvDataShard::TEvRead> DefaultReadSettings() { + auto result = MakeHolder<NKikimr::TEvDataShard::TEvRead>(); + result->Record.MergeFrom(DefaultRangeEvReadSettings.Data); + return result; +} + +struct TDefaultRangeEvReadAckSettings { + NKikimrTxDataShard::TEvReadAck Data; + + TDefaultRangeEvReadAckSettings() { + Data.SetMaxRows(32767); + Data.SetMaxBytes(200_MB); + } + +} DefaultRangeEvReadAckSettings; + +THolder<NKikimr::TEvDataShard::TEvReadAck> DefaultAckSettings() { + auto result = MakeHolder<NKikimr::TEvDataShard::TEvReadAck>(); + result->Record.MergeFrom(DefaultRangeEvReadAckSettings.Data); + return result; +} + } @@ -40,7 +69,6 @@ using namespace NYql::NDq; using namespace NKikimr; using namespace NKikimr::NDataShard; - class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq::IDqComputeActorAsyncInput { using TBase = TActorBootstrapped<TKqpReadActor>; public: @@ -261,7 +289,7 @@ public: NKikimrTxDataShard::TKqpReadRangesSourceSettings&& settings, const NYql::NDq::TDqAsyncIoFactory::TSourceArguments& args) : Settings(std::move(settings)) - , LogPrefix(TStringBuilder() << "TxId: " << args.TxId << ", task: " << args.TaskId << ", CA Id" << args.ComputeActorId << ". ") + , LogPrefix(TStringBuilder() << "TxId: " << args.TxId << ", task: " << args.TaskId << ", CA Id " << args.ComputeActorId << ". ") , ComputeActorId(args.ComputeActorId) , InputIndex(args.InputIndex) , TypeEnv(args.TypeEnv) @@ -584,17 +612,16 @@ public: } void StartRead(TShardState* state) { - ui64 limit = 0; + TMaybe<ui64> limit; if (Settings.GetItemsLimit()) { limit = Settings.GetItemsLimit() - Min(Settings.GetItemsLimit(), RecievedRowCount); - } else { - limit = EVREAD_MAX_ROWS; - } - if (limit == 0) { - return; + + if (*limit == 0) { + return; + } } - THolder<TEvDataShard::TEvRead> ev(new TEvDataShard::TEvRead()); + auto ev = ::DefaultReadSettings(); auto& record = ev->Record; state->FillEvRead(*ev, KeyColumnTypes, Settings.GetReverse()); @@ -627,8 +654,9 @@ public: record.MutableTableId()->SetSchemaVersion(Settings.GetTable().GetSchemaVersion()); record.SetReverse(Settings.GetReverse()); - record.SetMaxRows(limit); - record.SetMaxBytes(EVREAD_MAX_BYTES); + if (limit) { + record.SetMaxRows(*limit); + } record.SetResultFormat(Settings.GetDataFormat()); @@ -835,6 +863,10 @@ public: resultVector.push_back(std::move((*batch)[result.ProcessedRows])); ProcessedRowCount += 1; bytes += rowSize.AllocatedBytes; + if (ProcessedRowCount == Settings.GetItemsLimit()) { + finished = true; + return bytes; + } } CA_LOG_D(TStringBuilder() << "returned " << resultVector.size() << " rows"); @@ -844,19 +876,18 @@ public: Reads[id].Reset(); ResetReads++; } else if (!Reads[id].Finished) { - ui64 limit = 0; + TMaybe<ui64> limit; if (Settings.GetItemsLimit()) { limit = Settings.GetItemsLimit() - Min(Settings.GetItemsLimit(), RecievedRowCount); - } else { - limit = EVREAD_MAX_ROWS; } - if (limit > 0) { - THolder<TEvDataShard::TEvReadAck> request(new TEvDataShard::TEvReadAck()); + if (!limit || *limit > 0) { + auto request = ::DefaultAckSettings(); request->Record.SetReadId(record.GetReadId()); request->Record.SetSeqNo(record.GetSeqNo()); - request->Record.SetMaxRows(limit); - request->Record.SetMaxBytes(EVREAD_MAX_BYTES); + if (limit) { + request->Record.SetMaxRows(*limit); + } Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(request.Release(), state->TabletId, true), IEventHandle::FlagTrackDelivery); } else { @@ -869,14 +900,11 @@ public: } StartTableScan(); - if (PendingShards.Size() > 0) { - return bytes; - } Results.pop(); CA_LOG_D("dropping batch"); - if (RunningReads() == 0 || (Settings.HasItemsLimit() && ProcessedRowCount >= Settings.GetItemsLimit())) { + if (RunningReads() == 0 || (Settings.GetItemsLimit() && ProcessedRowCount >= Settings.GetItemsLimit())) { finished = true; break; } @@ -1003,5 +1031,13 @@ void RegisterKqpReadActor(NYql::NDq::TDqAsyncIoFactory& factory) { }); } +void InjectRangeEvReadSettings(const NKikimrTxDataShard::TEvRead& read) { + ::DefaultRangeEvReadSettings.Data.MergeFrom(read); +} + +void InjectRangeEvReadAckSettings(const NKikimrTxDataShard::TEvReadAck& ack) { + ::DefaultRangeEvReadAckSettings.Data.MergeFrom(ack); +} + } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/runtime/kqp_read_actor.h b/ydb/core/kqp/runtime/kqp_read_actor.h index 28f3e17873c..e543ec456e1 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.h +++ b/ydb/core/kqp/runtime/kqp_read_actor.h @@ -2,10 +2,18 @@ #include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h> +namespace NKikimrTxDataShard { +class TEvRead; +class TEvReadAck; +} + namespace NKikimr { namespace NKqp { void RegisterKqpReadActor(NYql::NDq::TDqAsyncIoFactory& factory); +void InjectRangeEvReadSettings(const NKikimrTxDataShard::TEvRead&); +void InjectRangeEvReadAckSettings(const NKikimrTxDataShard::TEvReadAck&); + } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp index 1b17b842495..5ccc439b1e2 100644 --- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp @@ -1,6 +1,7 @@ #include <ydb/core/kqp/ut/common/kqp_ut_common.h> #include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> +#include <ydb/core/kqp/runtime/kqp_read_actor.h> namespace NKikimr::NKqp { @@ -3447,6 +3448,38 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } + Y_UNIT_TEST(DqSourceCount) { + TKikimrSettings settings; + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true); + settings.SetDomainRoot(KikimrDefaultUtDomainRoot); + TFeatureFlags flags; + flags.SetEnablePredicateExtractForDataQueries(true); + settings.SetFeatureFlags(flags); + settings.SetAppConfig(appConfig); + + TKikimrRunner kikimr(settings); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + NKikimrTxDataShard::TEvRead evread; + evread.SetMaxRowsInResult(1); + evread.SetMaxRows(2); + InjectRangeEvReadSettings(evread); + + NKikimrTxDataShard::TEvReadAck evreadack; + evreadack.SetMaxRows(2); + InjectRangeEvReadAckSettings(evreadack); + + { + auto result = session.ExecuteDataQuery(R"( + SELECT COUNT(*) FROM `/Root/EightShard`; + )", TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + CompareYson(R"([[24u]])", FormatResultSetYson(result.GetResultSet(0))); + } + } + Y_UNIT_TEST(DqSource) { TKikimrSettings settings; NKikimrConfig::TAppConfig appConfig; |