aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexSm <alex@ydb.tech>2025-03-14 12:47:55 +0100
committerGitHub <noreply@github.com>2025-03-14 14:47:55 +0300
commit6b6beaeb8bf8c9246664ccb1161965d888616aac (patch)
tree3ce81626c3365794cce478e44cb6e19b85a217a3
parentf5cf28c593c84581b645fc0a778e35aa1b2e382a (diff)
downloadydb-6b6beaeb8bf8c9246664ccb1161965d888616aac.tar.gz
Precharge for external blobs in DataShard Read Iterator Keys request (#14707) (#15730)
Co-authored-by: Semyon Danilov <senya@ydb.tech>
-rw-r--r--ydb/core/protos/config.proto6
-rw-r--r--ydb/core/tx/datashard/datashard.cpp3
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp49
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h7
-rw-r--r--ydb/core/tx/datashard/datashard_ut_read_iterator_ext_blobs.cpp88
5 files changed, 152 insertions, 1 deletions
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 9197664a76..ae41b59a29 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1227,6 +1227,12 @@ message TImmediateControlsConfig {
MinValue: 0,
MaxValue: 134217728,
DefaultValue: 0 }];
+
+ optional uint64 ReadIteratorKeysExtBlobsPrecharge = 21 [(ControlOptions) = {
+ Description: "Enable new precharge for DataShard read iterator",
+ MinValue: 0,
+ MaxValue: 1,
+ DefaultValue: 0 }];
}
message TTxLimitControls {
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 761ea6e643..150d5ac165 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -159,6 +159,7 @@ TDataShard::TDataShard(const TActorId &tablet, TTabletStorageInfo *info)
, EnableLeaderLeases(1, 0, 1)
, MinLeaderLeaseDurationUs(250000, 1000, 5000000)
, ChangeRecordDebugPrint(0, 0, 1)
+ , ReadIteratorKeysExtBlobsPrecharge(0, 0, 1)
, DataShardSysTables(InitDataShardSysTables(this))
, ChangeSenderActivator(info->TabletID)
, ChangeExchangeSplitter(this)
@@ -350,6 +351,8 @@ void TDataShard::IcbRegister() {
appData->Icb->RegisterSharedControl(CdcInitialScanReadAheadLo, "DataShardControls.CdcInitialScanReadAheadLo");
appData->Icb->RegisterSharedControl(CdcInitialScanReadAheadHi, "DataShardControls.CdcInitialScanReadAheadHi");
+ appData->Icb->RegisterSharedControl(ReadIteratorKeysExtBlobsPrecharge, "DataShardControls.ReadIteratorKeysExtBlobsPrecharge");
+
IcbRegistered = true;
}
}
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index 24fc3f5344..18e64550eb 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -294,6 +294,8 @@ class TReader {
absl::flat_hash_set<ui64> VolatileReadDependencies;
bool VolatileWaitForCommit = false;
+ const bool UseNewPrecharge;
+
enum class EReadStatus {
Done,
NeedData,
@@ -315,6 +317,7 @@ public:
, FirstUnprocessedQuery(State.FirstUnprocessedQuery)
, LastProcessedKey(State.LastProcessedKey)
, LastProcessedKeyErased(State.LastProcessedKeyErased)
+ , UseNewPrecharge(Self->GetUseNewPrecharge())
{
GetTimeFast(&StartTime);
EndTime = StartTime;
@@ -467,6 +470,52 @@ public:
TTransactionContext& txc,
ui32 queryIndex)
{
+ if (UseNewPrecharge) {
+ ui64 rowsLeft = GetRowsLeft();
+ ui64 prechargedRowsSize = 0;
+ ui64 prechargedCount = 0;
+
+ txc.Env.EnableReadMissingReferences();
+
+ bool ready = true;
+ while (rowsLeft > 0) {
+ if (!State.Reverse) {
+ ++queryIndex;
+ } else {
+ --queryIndex;
+ }
+ if (!(queryIndex < State.Request->Keys.size())) {
+ break;
+ }
+ if (!PrechargeKey(txc, State.Request->Keys[queryIndex])) {
+ ready = false;
+ } else {
+ const auto key = ToRawTypeValue(State.Request->Keys[queryIndex].GetCells(), TableInfo, true);
+
+ NTable::TRowState rowState;
+ rowState.Init(State.Columns.size());
+ NTable::TSelectStats stats;
+ txc.DB.Select(TableInfo.LocalTid, key, State.Columns, rowState, stats, 0, State.ReadVersion, GetReadTxMap(), GetReadTxObserver());
+
+ if (txc.Env.MissingReferencesSize()) {
+ prechargedRowsSize += EstimateSize(*rowState);
+ }
+ }
+
+ prechargedCount++;
+
+ if (ShouldStop(prechargedCount, prechargedRowsSize + txc.Env.MissingReferencesSize())) {
+ break;
+ }
+
+ --rowsLeft;
+ }
+
+ txc.Env.DisableReadMissingReferences();
+
+ return ready;
+ }
+
ui64 rowsLeft = GetRowsLeft();
bool ready = true;
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index d64e14291c..1a3204ab28 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -1766,6 +1766,11 @@ public:
return value != 0;
}
+ bool GetUseNewPrecharge() const {
+ ui64 value = ReadIteratorKeysExtBlobsPrecharge;
+ return value != 0;
+ }
+
template <typename T>
void ReleaseCache(T& tx) {
ReleaseTxCache(tx.GetTxCacheUsage());
@@ -2821,6 +2826,8 @@ private:
TControlWrapper ChangeRecordDebugPrint;
+ TControlWrapper ReadIteratorKeysExtBlobsPrecharge;
+
// Set of InRS keys to remove from local DB.
THashSet<TReadSetKey> InRSToRemove;
TIntrusivePtr<TThrRefBase> DataShardSysTables;
diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator_ext_blobs.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator_ext_blobs.cpp
index 2422f19198..3b4a425a4c 100644
--- a/ydb/core/tx/datashard/datashard_ut_read_iterator_ext_blobs.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_read_iterator_ext_blobs.cpp
@@ -76,12 +76,17 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) {
TTestActorRuntime* Runtime;
TNode(bool useExternalBlobs, int externalBlobColumns = 1) : ServerSettings(Pm.GetPort(2134)) {
+
+ TServerSettings::TControls controls;
+ controls.MutableDataShardControls()->SetReadIteratorKeysExtBlobsPrecharge(1);
+
ServerSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.AddStoragePool("ssd")
.AddStoragePool("hdd")
.AddStoragePool("ext")
- .SetEnableUuidAsPrimaryKey(true);
+ .SetEnableUuidAsPrimaryKey(true)
+ .SetControls(controls);
Server = new TServer(ServerSettings);
@@ -240,6 +245,87 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) {
UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->BlobsRequested, 10);
}
+ Y_UNIT_TEST(ExtBlobsWithSpecificKeys) {
+ // Read specific keys via read iterator as it has another code path.
+ TNode node(true);
+
+ auto server = node.Server;
+ auto& runtime = *node.Runtime;
+ auto& sender = node.Sender;
+ auto shard1 = node.Shard;
+ auto tableId1 = node.TableId;
+
+ TString largeValue(1_MB, 'L');
+
+ for (int i = 0; i < 20; i++) {
+ TString chunkNum = ToString(i);
+ TString query = R"___(
+ UPSERT INTO `/Root/table-1` (blob_id, chunk_num, data0) VALUES
+ (Uuid("65df1ec1-a97d-47b2-ae56-3c023da6ee8c"), )___" + chunkNum + ", \"" + largeValue + "\");";
+
+ ExecSQL(server, sender, query);
+ }
+
+ {
+ Cerr << "... waiting for stats after upsert" << Endl;
+ auto stats = WaitTableStats(runtime, shard1);
+ UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
+ UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 20);
+ UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1);
+ }
+
+ CompactTable(runtime, shard1, tableId1, false);
+
+ {
+ Cerr << "... waiting for stats after compaction" << Endl;
+ auto stats = WaitTableStats(runtime, shard1, [](const NKikimrTableStats::TTableStats& stats) {
+ return stats.GetPartCount() >= 1;
+ });
+ UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
+ UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 20);
+ UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1);
+ }
+
+ RebootTablet(runtime, shard1, sender);
+
+ auto iteratorCounter = SetupReadIteratorObserver(runtime);
+
+ // Read every second row so that KQP doesn't optimize it to a single range request.
+ TStringStream query;
+ query << R"(
+ SELECT blob_id, chunk_num, data0
+ FROM `/Root/table-1`
+ WHERE )";
+
+ for (int i = 0; i <= 18; i += 2) {
+ if (i > 0) {
+ query << " OR ";
+ }
+ query << "(blob_id = Uuid(\"65df1ec1-a97d-47b2-ae56-3c023da6ee8c\") AND chunk_num = " << i << ")";
+ }
+
+ query << R"(
+ ORDER BY blob_id, chunk_num ASC
+ LIMIT 100;
+ )";
+
+ auto readFuture = KqpSimpleSend(runtime, query.Str());
+
+ Ydb::Table::ExecuteDataQueryResponse res = AwaitResponse(runtime, std::move(readFuture));
+ auto& operation = res.Getoperation();
+ UNIT_ASSERT_VALUES_EQUAL(operation.status(), Ydb::StatusIds::SUCCESS);
+ Ydb::Table::ExecuteQueryResult result;
+ operation.result().UnpackTo(&result);
+ UNIT_ASSERT_EQUAL(result.result_sets().size(), 1);
+ auto& resultSet = result.result_sets()[0];
+ UNIT_ASSERT_EQUAL(resultSet.rows_size(), 10);
+
+ UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->Reads, 1);
+ UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->Continues, 2);
+ UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->EvGets, 2);
+ UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->BlobsRequested, 10);
+ }
+
Y_UNIT_TEST(ExtBlobsWithDeletesInTheBeginning) {
TNode node(true);