diff options
author | AlexSm <alex@ydb.tech> | 2025-03-14 12:47:55 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-03-14 14:47:55 +0300 |
commit | 6b6beaeb8bf8c9246664ccb1161965d888616aac (patch) | |
tree | 3ce81626c3365794cce478e44cb6e19b85a217a3 | |
parent | f5cf28c593c84581b645fc0a778e35aa1b2e382a (diff) | |
download | ydb-6b6beaeb8bf8c9246664ccb1161965d888616aac.tar.gz |
Precharge for external blobs in DataShard Read Iterator Keys request (#14707) (#15730)
Co-authored-by: Semyon Danilov <senya@ydb.tech>
-rw-r--r-- | ydb/core/protos/config.proto | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 3 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__read_iterator.cpp | 49 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 7 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_read_iterator_ext_blobs.cpp | 88 |
5 files changed, 152 insertions, 1 deletions
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 9197664a76..ae41b59a29 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1227,6 +1227,12 @@ message TImmediateControlsConfig { MinValue: 0, MaxValue: 134217728, DefaultValue: 0 }]; + + optional uint64 ReadIteratorKeysExtBlobsPrecharge = 21 [(ControlOptions) = { + Description: "Enable new precharge for DataShard read iterator", + MinValue: 0, + MaxValue: 1, + DefaultValue: 0 }]; } message TTxLimitControls { diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 761ea6e643..150d5ac165 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -159,6 +159,7 @@ TDataShard::TDataShard(const TActorId &tablet, TTabletStorageInfo *info) , EnableLeaderLeases(1, 0, 1) , MinLeaderLeaseDurationUs(250000, 1000, 5000000) , ChangeRecordDebugPrint(0, 0, 1) + , ReadIteratorKeysExtBlobsPrecharge(0, 0, 1) , DataShardSysTables(InitDataShardSysTables(this)) , ChangeSenderActivator(info->TabletID) , ChangeExchangeSplitter(this) @@ -350,6 +351,8 @@ void TDataShard::IcbRegister() { appData->Icb->RegisterSharedControl(CdcInitialScanReadAheadLo, "DataShardControls.CdcInitialScanReadAheadLo"); appData->Icb->RegisterSharedControl(CdcInitialScanReadAheadHi, "DataShardControls.CdcInitialScanReadAheadHi"); + appData->Icb->RegisterSharedControl(ReadIteratorKeysExtBlobsPrecharge, "DataShardControls.ReadIteratorKeysExtBlobsPrecharge"); + IcbRegistered = true; } } diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index 24fc3f5344..18e64550eb 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -294,6 +294,8 @@ class TReader { absl::flat_hash_set<ui64> VolatileReadDependencies; bool VolatileWaitForCommit = false; + const bool UseNewPrecharge; + enum class EReadStatus { Done, NeedData, @@ -315,6 +317,7 @@ public: , FirstUnprocessedQuery(State.FirstUnprocessedQuery) , LastProcessedKey(State.LastProcessedKey) , LastProcessedKeyErased(State.LastProcessedKeyErased) + , UseNewPrecharge(Self->GetUseNewPrecharge()) { GetTimeFast(&StartTime); EndTime = StartTime; @@ -467,6 +470,52 @@ public: TTransactionContext& txc, ui32 queryIndex) { + if (UseNewPrecharge) { + ui64 rowsLeft = GetRowsLeft(); + ui64 prechargedRowsSize = 0; + ui64 prechargedCount = 0; + + txc.Env.EnableReadMissingReferences(); + + bool ready = true; + while (rowsLeft > 0) { + if (!State.Reverse) { + ++queryIndex; + } else { + --queryIndex; + } + if (!(queryIndex < State.Request->Keys.size())) { + break; + } + if (!PrechargeKey(txc, State.Request->Keys[queryIndex])) { + ready = false; + } else { + const auto key = ToRawTypeValue(State.Request->Keys[queryIndex].GetCells(), TableInfo, true); + + NTable::TRowState rowState; + rowState.Init(State.Columns.size()); + NTable::TSelectStats stats; + txc.DB.Select(TableInfo.LocalTid, key, State.Columns, rowState, stats, 0, State.ReadVersion, GetReadTxMap(), GetReadTxObserver()); + + if (txc.Env.MissingReferencesSize()) { + prechargedRowsSize += EstimateSize(*rowState); + } + } + + prechargedCount++; + + if (ShouldStop(prechargedCount, prechargedRowsSize + txc.Env.MissingReferencesSize())) { + break; + } + + --rowsLeft; + } + + txc.Env.DisableReadMissingReferences(); + + return ready; + } + ui64 rowsLeft = GetRowsLeft(); bool ready = true; diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index d64e14291c..1a3204ab28 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1766,6 +1766,11 @@ public: return value != 0; } + bool GetUseNewPrecharge() const { + ui64 value = ReadIteratorKeysExtBlobsPrecharge; + return value != 0; + } + template <typename T> void ReleaseCache(T& tx) { ReleaseTxCache(tx.GetTxCacheUsage()); @@ -2821,6 +2826,8 @@ private: TControlWrapper ChangeRecordDebugPrint; + TControlWrapper ReadIteratorKeysExtBlobsPrecharge; + // Set of InRS keys to remove from local DB. THashSet<TReadSetKey> InRSToRemove; TIntrusivePtr<TThrRefBase> DataShardSysTables; diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator_ext_blobs.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator_ext_blobs.cpp index 2422f19198..3b4a425a4c 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_iterator_ext_blobs.cpp +++ b/ydb/core/tx/datashard/datashard_ut_read_iterator_ext_blobs.cpp @@ -76,12 +76,17 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) { TTestActorRuntime* Runtime; TNode(bool useExternalBlobs, int externalBlobColumns = 1) : ServerSettings(Pm.GetPort(2134)) { + + TServerSettings::TControls controls; + controls.MutableDataShardControls()->SetReadIteratorKeysExtBlobsPrecharge(1); + ServerSettings.SetDomainName("Root") .SetUseRealThreads(false) .AddStoragePool("ssd") .AddStoragePool("hdd") .AddStoragePool("ext") - .SetEnableUuidAsPrimaryKey(true); + .SetEnableUuidAsPrimaryKey(true) + .SetControls(controls); Server = new TServer(ServerSettings); @@ -240,6 +245,87 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) { UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->BlobsRequested, 10); } + Y_UNIT_TEST(ExtBlobsWithSpecificKeys) { + // Read specific keys via read iterator as it has another code path. + TNode node(true); + + auto server = node.Server; + auto& runtime = *node.Runtime; + auto& sender = node.Sender; + auto shard1 = node.Shard; + auto tableId1 = node.TableId; + + TString largeValue(1_MB, 'L'); + + for (int i = 0; i < 20; i++) { + TString chunkNum = ToString(i); + TString query = R"___( + UPSERT INTO `/Root/table-1` (blob_id, chunk_num, data0) VALUES + (Uuid("65df1ec1-a97d-47b2-ae56-3c023da6ee8c"), )___" + chunkNum + ", \"" + largeValue + "\");"; + + ExecSQL(server, sender, query); + } + + { + Cerr << "... waiting for stats after upsert" << Endl; + auto stats = WaitTableStats(runtime, shard1); + UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1); + UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 20); + UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1); + } + + CompactTable(runtime, shard1, tableId1, false); + + { + Cerr << "... waiting for stats after compaction" << Endl; + auto stats = WaitTableStats(runtime, shard1, [](const NKikimrTableStats::TTableStats& stats) { + return stats.GetPartCount() >= 1; + }); + UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1); + UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 20); + UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1); + } + + RebootTablet(runtime, shard1, sender); + + auto iteratorCounter = SetupReadIteratorObserver(runtime); + + // Read every second row so that KQP doesn't optimize it to a single range request. + TStringStream query; + query << R"( + SELECT blob_id, chunk_num, data0 + FROM `/Root/table-1` + WHERE )"; + + for (int i = 0; i <= 18; i += 2) { + if (i > 0) { + query << " OR "; + } + query << "(blob_id = Uuid(\"65df1ec1-a97d-47b2-ae56-3c023da6ee8c\") AND chunk_num = " << i << ")"; + } + + query << R"( + ORDER BY blob_id, chunk_num ASC + LIMIT 100; + )"; + + auto readFuture = KqpSimpleSend(runtime, query.Str()); + + Ydb::Table::ExecuteDataQueryResponse res = AwaitResponse(runtime, std::move(readFuture)); + auto& operation = res.Getoperation(); + UNIT_ASSERT_VALUES_EQUAL(operation.status(), Ydb::StatusIds::SUCCESS); + Ydb::Table::ExecuteQueryResult result; + operation.result().UnpackTo(&result); + UNIT_ASSERT_EQUAL(result.result_sets().size(), 1); + auto& resultSet = result.result_sets()[0]; + UNIT_ASSERT_EQUAL(resultSet.rows_size(), 10); + + UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->Reads, 1); + UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->Continues, 2); + UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->EvGets, 2); + UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->BlobsRequested, 10); + } + Y_UNIT_TEST(ExtBlobsWithDeletesInTheBeginning) { TNode node(true); |