diff options
author | dcherednik <dcherednik@ydb.tech> | 2023-07-14 18:37:33 +0300 |
---|---|---|
committer | dcherednik <dcherednik@ydb.tech> | 2023-07-14 18:37:33 +0300 |
commit | d964d7044c1728202bacd7a4abab1c35d6f1ea70 (patch) | |
tree | e61da790302a2c7c2b08544a2aa4304494725e50 | |
parent | bc8bb510e3019d32768824d6c6e99571f941d8c5 (diff) | |
download | ydb-d964d7044c1728202bacd7a4abab1c35d6f1ea70.tar.gz |
wait for async index receive all data before check the content. KIKIMR-18753
-rw-r--r-- | ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp | 41 |
1 files changed, 40 insertions, 1 deletions
diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp index 1b2ea03c11..0993ef4c28 100644 --- a/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp +++ b/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp @@ -1110,6 +1110,39 @@ Y_UNIT_TEST_SUITE(KqpMultishardIndex) { ])", FormatResultSetYson(result.GetResultSet(0))); } + void WaitForAsyncIndexContent(TSession session, const TString& table, size_t rowsInserted) { + UNIT_ASSERT_C(rowsInserted, "rowsInserted must be set"); + size_t rowsRead = 0; + size_t attempt = 0; + while (rowsRead != rowsInserted) { + auto it = session.ReadTable(table).GetValueSync(); + UNIT_ASSERT(it.IsSuccess()); + rowsRead = 0; + for (;;) { + auto tablePart = it.ReadNext().GetValueSync(); + if (tablePart.EOS()) { + break; + } + + UNIT_ASSERT_VALUES_EQUAL(tablePart.IsSuccess(), true); + + auto rsParser = TResultSetParser(tablePart.ExtractPart()); + + while (rsParser.TryNextRow()) { + rowsRead++; + } + } + + if (attempt) + Sleep(TDuration::Seconds(1)); + + if (attempt++ > 10) { + UNIT_ASSERT_C(false, "unable to get expected rows count during async index update"); + } + } + UNIT_ASSERT_VALUES_EQUAL(rowsInserted, rowsRead); + } + void CheckWriteIntoRenamingIndex(bool asyncIndex) { TKikimrRunner kikimr; @@ -1184,10 +1217,16 @@ Y_UNIT_TEST_SUITE(KqpMultishardIndex) { } }, 0, 2, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY); + const TString indexPath = "/Root/MultiShardIndexed/index_new/indexImplTable"; + + if (asyncIndex) { + WaitForAsyncIndexContent(session, indexPath, rowsInserted); + } + { TReadTableSettings settings; settings.Ordered(true); - auto it = session.ReadTable("/Root/MultiShardIndexed/index_new/indexImplTable", settings).GetValueSync(); + auto it = session.ReadTable(indexPath, settings).GetValueSync(); UNIT_ASSERT(it.IsSuccess()); int shard = 0; |