aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-10-09 22:24:56 +0300
committergvit <gvit@ydb.tech>2023-10-09 22:39:59 +0300
commit8c020b4b0b9323674b5c40875fed4ffb9fada74a (patch)
tree3af3672d385a43712945753a5cc1ceadbbea78fb
parent383cb7b4187c047b0947c472aa5f20447fff6740 (diff)
downloadydb-8c020b4b0b9323674b5c40875fed4ffb9fada74a.tar.gz
upsert if exists mode in bulk upsert for column build operation KIKIMR-18963
-rw-r--r--ydb/core/protos/tx_datashard.proto1
-rw-r--r--ydb/core/tx/datashard/build_index.cpp7
-rw-r--r--ydb/core/tx/datashard/datashard_common_upload.cpp20
-rw-r--r--ydb/core/tx/schemeshard/ut_column_build/ut_column_build.cpp172
-rw-r--r--ydb/core/tx/tx_proxy/upload_rows.cpp3
-rw-r--r--ydb/core/tx/tx_proxy/upload_rows.h1
-rw-r--r--ydb/core/tx/tx_proxy/upload_rows_common_impl.h4
7 files changed, 206 insertions, 2 deletions
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index da9a3f1537..3514732878 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -867,6 +867,7 @@ message TEvUploadRowsRequest {
optional uint64 CancelDeadlineMs = 4; // Wallclock timestamp (not duration)
optional bool WriteToTableShadow = 5;
optional uint64 OverloadSubscribe = 7;
+ optional bool UpsertIfExists = 8 [ default = false ];
}
message TEvUploadRowsResponse {
diff --git a/ydb/core/tx/datashard/build_index.cpp b/ydb/core/tx/datashard/build_index.cpp
index 1f50a7a832..f76f078be9 100644
--- a/ydb/core/tx/datashard/build_index.cpp
+++ b/ydb/core/tx/datashard/build_index.cpp
@@ -592,11 +592,16 @@ private:
"Upload, last key " << DebugPrintPoint(KeyTypes, WriteBuf.GetLastKey().GetCells(), *AppData()->TypeRegistry)
<< " " << Debug());
+ auto writeMode = NTxProxy::EUploadRowsMode::WriteToTableShadow;
+ if (ColumnBuildSettings.columnSize() > 0) {
+ writeMode = NTxProxy::EUploadRowsMode::UpsertIfExists;
+ }
+
auto actor = NTxProxy::CreateUploadRowsInternal(
SelfId(), TargetTable,
UploadColumnsTypes,
WriteBuf.GetRowsData(),
- NTxProxy::EUploadRowsMode::WriteToTableShadow,
+ writeMode,
true /*writeToPrivateTable*/);
Uploader = TActivationContext::AsActorContext().MakeFor(SelfId()).Register(actor);
diff --git a/ydb/core/tx/datashard/datashard_common_upload.cpp b/ydb/core/tx/datashard/datashard_common_upload.cpp
index a525a53fae..5f1161d04f 100644
--- a/ydb/core/tx/datashard/datashard_common_upload.cpp
+++ b/ydb/core/tx/datashard/datashard_common_upload.cpp
@@ -54,6 +54,7 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans
}
}
+ const bool upsertIfExists = record.GetUpsertIfExists();
const bool writeToTableShadow = record.GetWriteToTableShadow();
const bool readForTableShadow = writeToTableShadow && !shadowTableId;
const ui32 writeTableId = writeToTableShadow && shadowTableId ? shadowTableId : localTableId;
@@ -151,6 +152,23 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans
}
}
+ if (upsertIfExists) {
+ rowState.Init(tagsForSelect.size());
+ auto ready = userDb.SelectRow(fullTableId, key, tagsForSelect, rowState);
+ if (ready == NTable::EReady::Page) {
+ pageFault = true;
+ }
+
+ if (pageFault) {
+ continue;
+ }
+
+ if (rowState == NTable::ERowOp::Erase || rowState == NTable::ERowOp::Absent) {
+ // in upsert if exists mode we must be sure that we insert only existing rows.
+ continue;
+ }
+ }
+
value.clear();
size_t vi = 0;
for (const auto& vt : valueCols) {
@@ -179,6 +197,8 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans
}
if (!writeToTableShadow) {
+ // note, that for upsertIfExists mode we must break locks, because otherwise we can
+ // produce inconsistency.
if (BreakLocks) {
if (breakWriteConflicts) {
if (!self->BreakWriteConflicts(txc.DB, fullTableId, keyCells.GetCells(), volatileDependencies)) {
diff --git a/ydb/core/tx/schemeshard/ut_column_build/ut_column_build.cpp b/ydb/core/tx/schemeshard/ut_column_build/ut_column_build.cpp
index 714fc1187c..72419fef01 100644
--- a/ydb/core/tx/schemeshard/ut_column_build/ut_column_build.cpp
+++ b/ydb/core/tx/schemeshard/ut_column_build/ut_column_build.cpp
@@ -9,7 +9,7 @@ using namespace NKikimr;
using namespace NSchemeShard;
using namespace NSchemeShardUT_Private;
-Y_UNIT_TEST_SUITE(IndexBuildTest) {
+Y_UNIT_TEST_SUITE(ColumnBuildTest) {
Y_UNIT_TEST(AlreadyExists) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);
@@ -228,6 +228,176 @@ Y_UNIT_TEST_SUITE(IndexBuildTest) {
Y_ASSERT(descr.GetIndexBuild().GetState() == Ydb::Table::IndexBuildState::STATE_DONE);
}
+ Y_UNIT_TEST(BuildColumnDoesnotRestoreDeletedRows) {
+ TTestBasicRuntime runtime(1, false);
+ TTestEnv env(runtime);
+ ui64 txId = 100;
+
+ TestCreateExtSubDomain(runtime, ++txId, "/MyRoot",
+ "Name: \"ResourceDB\"");
+ env.TestWaitNotification(runtime, txId);
+
+ TestAlterExtSubDomain(runtime, ++txId, "/MyRoot",
+ "StoragePools { "
+ " Name: \"pool-1\" "
+ " Kind: \"pool-kind-1\" "
+ "} "
+ "StoragePools { "
+ " Name: \"pool-2\" "
+ " Kind: \"pool-kind-2\" "
+ "} "
+ "PlanResolution: 50 "
+ "Coordinators: 1 "
+ "Mediators: 1 "
+ "TimeCastBucketsPerMediator: 2 "
+ "ExternalSchemeShard: true "
+ "Name: \"ResourceDB\"");
+ env.TestWaitNotification(runtime, txId);
+
+ const auto attrs = AlterUserAttrs({
+ {"cloud_id", "CLOUD_ID_VAL"},
+ {"folder_id", "FOLDER_ID_VAL"},
+ {"database_id", "DATABASE_ID_VAL"}
+ });
+
+ TestCreateExtSubDomain(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ Name: "ServerLessDB"
+ ResourcesDomainKey {
+ SchemeShard: %lu
+ PathId: 2
+ }
+ )", TTestTxConfig::SchemeShard), attrs);
+ env.TestWaitNotification(runtime, txId);
+
+ TString alterData = TStringBuilder()
+ << "PlanResolution: 50 "
+ << "Coordinators: 1 "
+ << "Mediators: 1 "
+ << "TimeCastBucketsPerMediator: 2 "
+ << "ExternalSchemeShard: true "
+ << "ExternalHive: false "
+ << "Name: \"ServerLessDB\" "
+ << "StoragePools { "
+ << " Name: \"pool-1\" "
+ << " Kind: \"pool-kind-1\" "
+ << "} ";
+ TestAlterExtSubDomain(runtime, ++txId, "/MyRoot", alterData);
+ env.TestWaitNotification(runtime, txId);
+
+ ui64 tenantSchemeShard = 0;
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/ServerLessDB"),
+ {NLs::PathExist,
+ NLs::IsExternalSubDomain("ServerLessDB"),
+ NLs::ExtractTenantSchemeshard(&tenantSchemeShard)});
+
+ // Just create main table
+ TestCreateTable(runtime, tenantSchemeShard, ++txId, "/MyRoot/ServerLessDB", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "index" Type: "Uint64" }
+ Columns { Name: "value" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ )");
+ env.TestWaitNotification(runtime, txId, tenantSchemeShard);
+
+ auto fnWriteRow = [&] (ui64 tabletId, ui32 key, ui32 index, TString value, const char* table) {
+ TString writeQuery = Sprintf(R"(
+ (
+ (let key '( '('key (Uint64 '%u ) ) ) )
+ (let row '( '('index (Uint64 '%u ) ) '('value (Utf8 '%s) ) ) )
+ (return (AsList (UpdateRow '__user__%s key row) ))
+ )
+ )", key, index, value.c_str(), table);
+ NKikimrMiniKQL::TResult result;
+ TString err;
+ NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, writeQuery, result, err);
+ UNIT_ASSERT_VALUES_EQUAL(err, "");
+ UNIT_ASSERT_VALUES_EQUAL(status, NKikimrProto::EReplyStatus::OK);;
+ };
+
+ auto fnDeleteRow = [&] (ui64 tabletId, ui32 key, const char* table) {
+ TString writeQuery = Sprintf(R"(
+ (
+ (let key '( '('key (Uint64 '%u ) ) ) )
+ (return (AsList (EraseRow '__user__%s key ) ))
+ )
+ )", key, table);
+ NKikimrMiniKQL::TResult result;
+ TString err;
+ NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, writeQuery, result, err);
+ UNIT_ASSERT_VALUES_EQUAL(err, "");
+ UNIT_ASSERT_VALUES_EQUAL(status, NKikimrProto::EReplyStatus::OK);;
+ };
+
+ for (ui32 delta = 0; delta < 101; ++delta) {
+ fnWriteRow(TTestTxConfig::FakeHiveTablets + 6, 1 + delta, 1000 + delta, "aaaa", "Table");
+ }
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::BUILD_INDEX, NLog::PRI_TRACE);
+
+ TestDescribeResult(DescribePath(runtime, tenantSchemeShard, "/MyRoot/ServerLessDB/Table"),
+ {NLs::PathExist,
+ NLs::IndexesCount(0),
+ NLs::PathVersionEqual(3)});
+
+ TStringBuilder meteringMessages;
+
+ bool enabledCapture = true;
+ TVector<TAutoPtr<IEventHandle>> delayedUpsertRows;
+ auto grab = [&delayedUpsertRows, &enabledCapture](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) -> auto {
+ if (enabledCapture && ev->GetTypeRewrite() == NKikimr::TEvDataShard::TEvUploadRowsRequest::EventType) {
+ delayedUpsertRows.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+
+ TDispatchOptions opts;
+ opts.FinalEvents.emplace_back([&delayedUpsertRows](IEventHandle&) {
+ return delayedUpsertRows.size() > 0;
+ });
+
+ runtime.SetObserverFunc(grab);
+
+ Ydb::TypedValue defaultValue;
+ defaultValue.mutable_type()->set_type_id(Ydb::Type::UINT64);
+ defaultValue.mutable_value()->set_uint64_value(10);
+
+ AsyncBuildColumn(runtime, ++txId, tenantSchemeShard, "/MyRoot/ServerLessDB", "/MyRoot/ServerLessDB/Table", "DefaultValue", defaultValue);
+
+ runtime.DispatchEvents(opts);
+ Cerr << delayedUpsertRows.size() << Endl;
+ UNIT_ASSERT_C(delayedUpsertRows.size() > 0, "not captured several events");
+
+ for (ui32 delta = 0; delta < 50; ++delta) {
+ fnDeleteRow(TTestTxConfig::FakeHiveTablets + 6, 1 + delta, "Table");
+ }
+
+ for (const auto& ev: delayedUpsertRows) {
+ runtime.Send(ev);
+ }
+
+ enabledCapture = false;
+
+ auto listing = TestListBuildIndex(runtime, tenantSchemeShard, "/MyRoot/ServerLessDB");
+ Y_ASSERT(listing.EntriesSize() == 1);
+
+ env.TestWaitNotification(runtime, txId, tenantSchemeShard);
+
+ auto descr = TestGetBuildIndex(runtime, tenantSchemeShard, "/MyRoot/ServerLessDB", txId);
+ Y_ASSERT(descr.GetIndexBuild().GetState() == Ydb::Table::IndexBuildState::STATE_DONE);
+
+ for (ui32 delta = 50; delta < 101; ++delta) {
+ UNIT_ASSERT(CheckLocalRowExists(runtime, TTestTxConfig::FakeHiveTablets + 6, "__user__Table", "key", 1 + delta));
+ }
+
+ for (ui32 delta = 0; delta < 50; ++delta) {
+ UNIT_ASSERT(!CheckLocalRowExists(runtime, TTestTxConfig::FakeHiveTablets + 6, "__user__Table", "key", 1 + delta));
+ }
+ }
+
Y_UNIT_TEST(BaseCase) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);
diff --git a/ydb/core/tx/tx_proxy/upload_rows.cpp b/ydb/core/tx/tx_proxy/upload_rows.cpp
index 3c4580a8a8..5bf5d915ee 100644
--- a/ydb/core/tx/tx_proxy/upload_rows.cpp
+++ b/ydb/core/tx/tx_proxy/upload_rows.cpp
@@ -29,6 +29,9 @@ public:
case EUploadRowsMode::WriteToTableShadow:
WriteToTableShadow = true;
break;
+ case EUploadRowsMode::UpsertIfExists:
+ UpsertIfExists = true;
+ break;
}
}
diff --git a/ydb/core/tx/tx_proxy/upload_rows.h b/ydb/core/tx/tx_proxy/upload_rows.h
index 63590b2bff..7c37472620 100644
--- a/ydb/core/tx/tx_proxy/upload_rows.h
+++ b/ydb/core/tx/tx_proxy/upload_rows.h
@@ -14,6 +14,7 @@ namespace NTxProxy {
enum class EUploadRowsMode {
Normal,
WriteToTableShadow,
+ UpsertIfExists,
};
using TUploadTypes = TVector<std::pair<TString, Ydb::Type>>;
diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
index 22f3b8c76b..8bc74b8ecd 100644
--- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
+++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
@@ -204,6 +204,7 @@ protected:
bool WriteToTableShadow = false;
bool AllowWriteToPrivateTable = false;
bool DiskQuotaExceeded = false;
+ bool UpsertIfExists = false;
std::shared_ptr<arrow::RecordBatch> Batch;
float RuCost = 0.0;
@@ -1046,6 +1047,9 @@ private:
if (WriteToTableShadow) {
ev->Record.SetWriteToTableShadow(true);
}
+ if (UpsertIfExists) {
+ ev->Record.SetUpsertIfExists(true);
+ }
// Copy protobuf settings without rows
retryState->Headers = ev->Record;
}