diff options
author | dcherednik <dcherednik@ydb.tech> | 2022-07-20 16:05:42 +0300 |
---|---|---|
committer | dcherednik <dcherednik@ydb.tech> | 2022-07-20 16:05:42 +0300 |
commit | 53e2e801caef50a19bb28d7f0c59ddbafe334950 (patch) | |
tree | c36e12f7fe8b5a8da70972df07b6d015e2c9a39a | |
parent | 070dcb976b8e2c3f36068da9b27be329346b90a5 (diff) | |
download | ydb-53e2e801caef50a19bb28d7f0c59ddbafe334950.tar.gz |
Check write in to index table during index rename produce correct result.
-rw-r--r-- | ydb/core/kqp/ut/kqp_indexes_multishard_ut.cpp | 126 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_scheme_ut.cpp | 1 |
2 files changed, 123 insertions, 4 deletions
diff --git a/ydb/core/kqp/ut/kqp_indexes_multishard_ut.cpp b/ydb/core/kqp/ut/kqp_indexes_multishard_ut.cpp index 77fea831b7e..aba32bdb25e 100644 --- a/ydb/core/kqp/ut/kqp_indexes_multishard_ut.cpp +++ b/ydb/core/kqp/ut/kqp_indexes_multishard_ut.cpp @@ -4,6 +4,7 @@ #include <ydb/public/sdk/cpp/client/ydb_table/table.h> #include <library/cpp/json/json_reader.h> +#include <library/cpp/threading/local_executor/local_executor.h> #include <util/string/printf.h> @@ -18,15 +19,16 @@ namespace { NYdb::NTable::TDataQueryResult ExecuteDataQuery(TSession& session, const TString& query) { const auto txSettings = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(); - return session.ExecuteDataQuery(query, txSettings).ExtractValueSync(); + return session.ExecuteDataQuery(query, txSettings, + TExecDataQuerySettings().KeepInQueryCache(true)).ExtractValueSync(); } NYdb::NTable::TDataQueryResult ExecuteDataQuery(TSession& session, const TString& query, TParams& params) { const auto txSettings = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(); - return session.ExecuteDataQuery(query, txSettings, params).ExtractValueSync(); + return session.ExecuteDataQuery(query, txSettings, params, + TExecDataQuerySettings().KeepInQueryCache(true)).ExtractValueSync(); } - void CreateTableWithMultishardIndex(Tests::TClient& client) { const TString scheme = R"(Name: "MultiShardIndexed" Columns { Name: "key" Type: "Uint64" } @@ -1095,6 +1097,124 @@ Y_UNIT_TEST_SUITE(KqpMultishardIndex) { [[4294967295u];[4u];["v4"]] ])", FormatResultSetYson(result.GetResultSet(0))); } + + Y_UNIT_TEST_NEW_ENGINE(WriteIntoRenamingIndex) { + TKikimrRunner kikimr; + + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + CreateTableWithMultishardIndex(kikimr.GetTestClient()); + + auto buildParam = [&db](ui64 id) { + return db.GetParamsBuilder() + .AddParam("$rows") + .BeginList() + .AddListItem() + .BeginStruct() + .AddMember("key").Uint64(id) + .AddMember("fk").Uint32(id) + .AddMember("value").Utf8("v1") + .EndStruct() + .AddListItem() + .BeginStruct() + .AddMember("key").Uint64(id << 31) + .AddMember("fk").Uint32(id + (1u << 31)) + .AddMember("value").Utf8("v2") + .EndStruct() + .EndList() + .Build() + .Build(); + }; + + TMutex SyncMutex; + TCondVar SyncCondVar; + const size_t rows = 1000; + size_t rowsInserted = 0; + + NPar::LocalExecutor().RunAdditionalThreads(2); + NPar::LocalExecutor().ExecRange([&](int id) mutable { + switch (id) { + case 0: { + size_t count = rows; + while (--count) { + const TString q(R"( + DECLARE $rows AS List < Struct<key: Uint64, fk: Uint32, value: Utf8 > >; + UPSERT INTO `/Root/MultiShardIndexed` (key, fk, value) + SELECT key, fk, value FROM AS_TABLE($rows); + )"); + auto r = db.RetryOperationSync([=](TSession s) { + auto p = buildParam(count); + auto t = ExecuteDataQuery(s, q, p); + return t; + }); + UNIT_ASSERT_C(r.IsSuccess(), r.GetIssues().ToString()); + rowsInserted += 2; + // Let write 10 rows before the index will be renamed + if (count == (rows - 10)) { + TGuard<TMutex> guard(SyncMutex); + SyncCondVar.Signal(); + } + } + } + break; + case 1: { + TGuard<TMutex> guard(SyncMutex); + SyncCondVar.WaitI(SyncMutex); + auto s = db.CreateSession().GetValueSync().GetSession(); + auto st = s.ExecuteSchemeQuery(R"( + ALTER TABLE `/Root/MultiShardIndexed` RENAME INDEX index TO index_new; + )").ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(st.GetStatus(), EStatus::SUCCESS, st.GetIssues().ToString()); + } + break; + default: + Y_FAIL("Unknown id"); + } + }, 0, 2, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY); + + { + TReadTableSettings settings; + settings.Ordered(true); + auto it = session.ReadTable("/Root/MultiShardIndexed/index_new/indexImplTable", settings).GetValueSync(); + UNIT_ASSERT(it.IsSuccess()); + + int shard = 0; + size_t rowsRead = 0; + for (;;) { + auto tablePart = it.ReadNext().GetValueSync(); + if (tablePart.EOS()) { + break; + } + + UNIT_ASSERT_VALUES_EQUAL(tablePart.IsSuccess(), true); + auto resultSet = tablePart.ExtractPart(); + + auto rsParser = TResultSetParser(resultSet); + + ui32 startVal = 1; + while (rsParser.TryNextRow()) { + auto val = rsParser.GetValue(0); + TValueParser vp(val); + vp.OpenOptional(); + if (!vp.IsNull()) { + rowsRead++; + switch (shard) { + case 0: + UNIT_ASSERT_VALUES_EQUAL(vp.GetUint32(), startVal++); + break; + case 1: + UNIT_ASSERT_VALUES_EQUAL(vp.GetUint32(), (startVal++) + (1u << 31)); + break; + default: + Y_FAIL("unexpected shard id"); + } + } + } + shard++; + } + UNIT_ASSERT_VALUES_EQUAL(rowsInserted, rowsRead); + } + } } } diff --git a/ydb/core/kqp/ut/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/kqp_scheme_ut.cpp index 46bb6eeb089..06dc4ba969b 100644 --- a/ydb/core/kqp/ut/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/kqp_scheme_ut.cpp @@ -2183,7 +2183,6 @@ Y_UNIT_TEST_SUITE(KqpScheme) { } } - Y_UNIT_TEST(AlterTableWithDecimalColumn) { TKikimrRunner kikimr; auto db = kikimr.GetTableClient(); |