aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordcherednik <dcherednik@ydb.tech>2022-07-20 16:05:42 +0300
committerdcherednik <dcherednik@ydb.tech>2022-07-20 16:05:42 +0300
commit53e2e801caef50a19bb28d7f0c59ddbafe334950 (patch)
treec36e12f7fe8b5a8da70972df07b6d015e2c9a39a
parent070dcb976b8e2c3f36068da9b27be329346b90a5 (diff)
downloadydb-53e2e801caef50a19bb28d7f0c59ddbafe334950.tar.gz
Check write in to index table during index rename produce correct result.
-rw-r--r--ydb/core/kqp/ut/kqp_indexes_multishard_ut.cpp126
-rw-r--r--ydb/core/kqp/ut/kqp_scheme_ut.cpp1
2 files changed, 123 insertions, 4 deletions
diff --git a/ydb/core/kqp/ut/kqp_indexes_multishard_ut.cpp b/ydb/core/kqp/ut/kqp_indexes_multishard_ut.cpp
index 77fea831b7e..aba32bdb25e 100644
--- a/ydb/core/kqp/ut/kqp_indexes_multishard_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_indexes_multishard_ut.cpp
@@ -4,6 +4,7 @@
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <library/cpp/json/json_reader.h>
+#include <library/cpp/threading/local_executor/local_executor.h>
#include <util/string/printf.h>
@@ -18,15 +19,16 @@ namespace {
NYdb::NTable::TDataQueryResult ExecuteDataQuery(TSession& session, const TString& query) {
const auto txSettings = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx();
- return session.ExecuteDataQuery(query, txSettings).ExtractValueSync();
+ return session.ExecuteDataQuery(query, txSettings,
+ TExecDataQuerySettings().KeepInQueryCache(true)).ExtractValueSync();
}
NYdb::NTable::TDataQueryResult ExecuteDataQuery(TSession& session, const TString& query, TParams& params) {
const auto txSettings = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx();
- return session.ExecuteDataQuery(query, txSettings, params).ExtractValueSync();
+ return session.ExecuteDataQuery(query, txSettings, params,
+ TExecDataQuerySettings().KeepInQueryCache(true)).ExtractValueSync();
}
-
void CreateTableWithMultishardIndex(Tests::TClient& client) {
const TString scheme = R"(Name: "MultiShardIndexed"
Columns { Name: "key" Type: "Uint64" }
@@ -1095,6 +1097,124 @@ Y_UNIT_TEST_SUITE(KqpMultishardIndex) {
[[4294967295u];[4u];["v4"]]
])", FormatResultSetYson(result.GetResultSet(0)));
}
+
+ Y_UNIT_TEST_NEW_ENGINE(WriteIntoRenamingIndex) {
+ TKikimrRunner kikimr;
+
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+ CreateTableWithMultishardIndex(kikimr.GetTestClient());
+
+ auto buildParam = [&db](ui64 id) {
+ return db.GetParamsBuilder()
+ .AddParam("$rows")
+ .BeginList()
+ .AddListItem()
+ .BeginStruct()
+ .AddMember("key").Uint64(id)
+ .AddMember("fk").Uint32(id)
+ .AddMember("value").Utf8("v1")
+ .EndStruct()
+ .AddListItem()
+ .BeginStruct()
+ .AddMember("key").Uint64(id << 31)
+ .AddMember("fk").Uint32(id + (1u << 31))
+ .AddMember("value").Utf8("v2")
+ .EndStruct()
+ .EndList()
+ .Build()
+ .Build();
+ };
+
+ TMutex SyncMutex;
+ TCondVar SyncCondVar;
+ const size_t rows = 1000;
+ size_t rowsInserted = 0;
+
+ NPar::LocalExecutor().RunAdditionalThreads(2);
+ NPar::LocalExecutor().ExecRange([&](int id) mutable {
+ switch (id) {
+ case 0: {
+ size_t count = rows;
+ while (--count) {
+ const TString q(R"(
+ DECLARE $rows AS List < Struct<key: Uint64, fk: Uint32, value: Utf8 > >;
+ UPSERT INTO `/Root/MultiShardIndexed` (key, fk, value)
+ SELECT key, fk, value FROM AS_TABLE($rows);
+ )");
+ auto r = db.RetryOperationSync([=](TSession s) {
+ auto p = buildParam(count);
+ auto t = ExecuteDataQuery(s, q, p);
+ return t;
+ });
+ UNIT_ASSERT_C(r.IsSuccess(), r.GetIssues().ToString());
+ rowsInserted += 2;
+ // Let write 10 rows before the index will be renamed
+ if (count == (rows - 10)) {
+ TGuard<TMutex> guard(SyncMutex);
+ SyncCondVar.Signal();
+ }
+ }
+ }
+ break;
+ case 1: {
+ TGuard<TMutex> guard(SyncMutex);
+ SyncCondVar.WaitI(SyncMutex);
+ auto s = db.CreateSession().GetValueSync().GetSession();
+ auto st = s.ExecuteSchemeQuery(R"(
+ ALTER TABLE `/Root/MultiShardIndexed` RENAME INDEX index TO index_new;
+ )").ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(st.GetStatus(), EStatus::SUCCESS, st.GetIssues().ToString());
+ }
+ break;
+ default:
+ Y_FAIL("Unknown id");
+ }
+ }, 0, 2, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY);
+
+ {
+ TReadTableSettings settings;
+ settings.Ordered(true);
+ auto it = session.ReadTable("/Root/MultiShardIndexed/index_new/indexImplTable", settings).GetValueSync();
+ UNIT_ASSERT(it.IsSuccess());
+
+ int shard = 0;
+ size_t rowsRead = 0;
+ for (;;) {
+ auto tablePart = it.ReadNext().GetValueSync();
+ if (tablePart.EOS()) {
+ break;
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(tablePart.IsSuccess(), true);
+ auto resultSet = tablePart.ExtractPart();
+
+ auto rsParser = TResultSetParser(resultSet);
+
+ ui32 startVal = 1;
+ while (rsParser.TryNextRow()) {
+ auto val = rsParser.GetValue(0);
+ TValueParser vp(val);
+ vp.OpenOptional();
+ if (!vp.IsNull()) {
+ rowsRead++;
+ switch (shard) {
+ case 0:
+ UNIT_ASSERT_VALUES_EQUAL(vp.GetUint32(), startVal++);
+ break;
+ case 1:
+ UNIT_ASSERT_VALUES_EQUAL(vp.GetUint32(), (startVal++) + (1u << 31));
+ break;
+ default:
+ Y_FAIL("unexpected shard id");
+ }
+ }
+ }
+ shard++;
+ }
+ UNIT_ASSERT_VALUES_EQUAL(rowsInserted, rowsRead);
+ }
+ }
}
}
diff --git a/ydb/core/kqp/ut/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/kqp_scheme_ut.cpp
index 46bb6eeb089..06dc4ba969b 100644
--- a/ydb/core/kqp/ut/kqp_scheme_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_scheme_ut.cpp
@@ -2183,7 +2183,6 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
}
}
-
Y_UNIT_TEST(AlterTableWithDecimalColumn) {
TKikimrRunner kikimr;
auto db = kikimr.GetTableClient();