diff options
author | gvit <gvit@ydb.tech> | 2023-10-09 22:24:56 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2023-10-09 22:39:59 +0300 |
commit | 8c020b4b0b9323674b5c40875fed4ffb9fada74a (patch) | |
tree | 3af3672d385a43712945753a5cc1ceadbbea78fb | |
parent | 383cb7b4187c047b0947c472aa5f20447fff6740 (diff) | |
download | ydb-8c020b4b0b9323674b5c40875fed4ffb9fada74a.tar.gz |
upsert if exists mode in bulk upsert for column build operation KIKIMR-18963
-rw-r--r-- | ydb/core/protos/tx_datashard.proto | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/build_index.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_common_upload.cpp | 20 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_column_build/ut_column_build.cpp | 172 | ||||
-rw-r--r-- | ydb/core/tx/tx_proxy/upload_rows.cpp | 3 | ||||
-rw-r--r-- | ydb/core/tx/tx_proxy/upload_rows.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/tx_proxy/upload_rows_common_impl.h | 4 |
7 files changed, 206 insertions, 2 deletions
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index da9a3f1537..3514732878 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -867,6 +867,7 @@ message TEvUploadRowsRequest { optional uint64 CancelDeadlineMs = 4; // Wallclock timestamp (not duration) optional bool WriteToTableShadow = 5; optional uint64 OverloadSubscribe = 7; + optional bool UpsertIfExists = 8 [ default = false ]; } message TEvUploadRowsResponse { diff --git a/ydb/core/tx/datashard/build_index.cpp b/ydb/core/tx/datashard/build_index.cpp index 1f50a7a832..f76f078be9 100644 --- a/ydb/core/tx/datashard/build_index.cpp +++ b/ydb/core/tx/datashard/build_index.cpp @@ -592,11 +592,16 @@ private: "Upload, last key " << DebugPrintPoint(KeyTypes, WriteBuf.GetLastKey().GetCells(), *AppData()->TypeRegistry) << " " << Debug()); + auto writeMode = NTxProxy::EUploadRowsMode::WriteToTableShadow; + if (ColumnBuildSettings.columnSize() > 0) { + writeMode = NTxProxy::EUploadRowsMode::UpsertIfExists; + } + auto actor = NTxProxy::CreateUploadRowsInternal( SelfId(), TargetTable, UploadColumnsTypes, WriteBuf.GetRowsData(), - NTxProxy::EUploadRowsMode::WriteToTableShadow, + writeMode, true /*writeToPrivateTable*/); Uploader = TActivationContext::AsActorContext().MakeFor(SelfId()).Register(actor); diff --git a/ydb/core/tx/datashard/datashard_common_upload.cpp b/ydb/core/tx/datashard/datashard_common_upload.cpp index a525a53fae..5f1161d04f 100644 --- a/ydb/core/tx/datashard/datashard_common_upload.cpp +++ b/ydb/core/tx/datashard/datashard_common_upload.cpp @@ -54,6 +54,7 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans } } + const bool upsertIfExists = record.GetUpsertIfExists(); const bool writeToTableShadow = record.GetWriteToTableShadow(); const bool readForTableShadow = writeToTableShadow && !shadowTableId; const ui32 writeTableId = writeToTableShadow && shadowTableId ? shadowTableId : localTableId; @@ -151,6 +152,23 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans } } + if (upsertIfExists) { + rowState.Init(tagsForSelect.size()); + auto ready = userDb.SelectRow(fullTableId, key, tagsForSelect, rowState); + if (ready == NTable::EReady::Page) { + pageFault = true; + } + + if (pageFault) { + continue; + } + + if (rowState == NTable::ERowOp::Erase || rowState == NTable::ERowOp::Absent) { + // in upsert if exists mode we must be sure that we insert only existing rows. + continue; + } + } + value.clear(); size_t vi = 0; for (const auto& vt : valueCols) { @@ -179,6 +197,8 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans } if (!writeToTableShadow) { + // note, that for upsertIfExists mode we must break locks, because otherwise we can + // produce inconsistency. if (BreakLocks) { if (breakWriteConflicts) { if (!self->BreakWriteConflicts(txc.DB, fullTableId, keyCells.GetCells(), volatileDependencies)) { diff --git a/ydb/core/tx/schemeshard/ut_column_build/ut_column_build.cpp b/ydb/core/tx/schemeshard/ut_column_build/ut_column_build.cpp index 714fc1187c..72419fef01 100644 --- a/ydb/core/tx/schemeshard/ut_column_build/ut_column_build.cpp +++ b/ydb/core/tx/schemeshard/ut_column_build/ut_column_build.cpp @@ -9,7 +9,7 @@ using namespace NKikimr; using namespace NSchemeShard; using namespace NSchemeShardUT_Private; -Y_UNIT_TEST_SUITE(IndexBuildTest) { +Y_UNIT_TEST_SUITE(ColumnBuildTest) { Y_UNIT_TEST(AlreadyExists) { TTestBasicRuntime runtime; TTestEnv env(runtime); @@ -228,6 +228,176 @@ Y_UNIT_TEST_SUITE(IndexBuildTest) { Y_ASSERT(descr.GetIndexBuild().GetState() == Ydb::Table::IndexBuildState::STATE_DONE); } + Y_UNIT_TEST(BuildColumnDoesnotRestoreDeletedRows) { + TTestBasicRuntime runtime(1, false); + TTestEnv env(runtime); + ui64 txId = 100; + + TestCreateExtSubDomain(runtime, ++txId, "/MyRoot", + "Name: \"ResourceDB\""); + env.TestWaitNotification(runtime, txId); + + TestAlterExtSubDomain(runtime, ++txId, "/MyRoot", + "StoragePools { " + " Name: \"pool-1\" " + " Kind: \"pool-kind-1\" " + "} " + "StoragePools { " + " Name: \"pool-2\" " + " Kind: \"pool-kind-2\" " + "} " + "PlanResolution: 50 " + "Coordinators: 1 " + "Mediators: 1 " + "TimeCastBucketsPerMediator: 2 " + "ExternalSchemeShard: true " + "Name: \"ResourceDB\""); + env.TestWaitNotification(runtime, txId); + + const auto attrs = AlterUserAttrs({ + {"cloud_id", "CLOUD_ID_VAL"}, + {"folder_id", "FOLDER_ID_VAL"}, + {"database_id", "DATABASE_ID_VAL"} + }); + + TestCreateExtSubDomain(runtime, ++txId, "/MyRoot", Sprintf(R"( + Name: "ServerLessDB" + ResourcesDomainKey { + SchemeShard: %lu + PathId: 2 + } + )", TTestTxConfig::SchemeShard), attrs); + env.TestWaitNotification(runtime, txId); + + TString alterData = TStringBuilder() + << "PlanResolution: 50 " + << "Coordinators: 1 " + << "Mediators: 1 " + << "TimeCastBucketsPerMediator: 2 " + << "ExternalSchemeShard: true " + << "ExternalHive: false " + << "Name: \"ServerLessDB\" " + << "StoragePools { " + << " Name: \"pool-1\" " + << " Kind: \"pool-kind-1\" " + << "} "; + TestAlterExtSubDomain(runtime, ++txId, "/MyRoot", alterData); + env.TestWaitNotification(runtime, txId); + + ui64 tenantSchemeShard = 0; + TestDescribeResult(DescribePath(runtime, "/MyRoot/ServerLessDB"), + {NLs::PathExist, + NLs::IsExternalSubDomain("ServerLessDB"), + NLs::ExtractTenantSchemeshard(&tenantSchemeShard)}); + + // Just create main table + TestCreateTable(runtime, tenantSchemeShard, ++txId, "/MyRoot/ServerLessDB", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "index" Type: "Uint64" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId, tenantSchemeShard); + + auto fnWriteRow = [&] (ui64 tabletId, ui32 key, ui32 index, TString value, const char* table) { + TString writeQuery = Sprintf(R"( + ( + (let key '( '('key (Uint64 '%u ) ) ) ) + (let row '( '('index (Uint64 '%u ) ) '('value (Utf8 '%s) ) ) ) + (return (AsList (UpdateRow '__user__%s key row) )) + ) + )", key, index, value.c_str(), table); + NKikimrMiniKQL::TResult result; + TString err; + NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, writeQuery, result, err); + UNIT_ASSERT_VALUES_EQUAL(err, ""); + UNIT_ASSERT_VALUES_EQUAL(status, NKikimrProto::EReplyStatus::OK);; + }; + + auto fnDeleteRow = [&] (ui64 tabletId, ui32 key, const char* table) { + TString writeQuery = Sprintf(R"( + ( + (let key '( '('key (Uint64 '%u ) ) ) ) + (return (AsList (EraseRow '__user__%s key ) )) + ) + )", key, table); + NKikimrMiniKQL::TResult result; + TString err; + NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, writeQuery, result, err); + UNIT_ASSERT_VALUES_EQUAL(err, ""); + UNIT_ASSERT_VALUES_EQUAL(status, NKikimrProto::EReplyStatus::OK);; + }; + + for (ui32 delta = 0; delta < 101; ++delta) { + fnWriteRow(TTestTxConfig::FakeHiveTablets + 6, 1 + delta, 1000 + delta, "aaaa", "Table"); + } + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::BUILD_INDEX, NLog::PRI_TRACE); + + TestDescribeResult(DescribePath(runtime, tenantSchemeShard, "/MyRoot/ServerLessDB/Table"), + {NLs::PathExist, + NLs::IndexesCount(0), + NLs::PathVersionEqual(3)}); + + TStringBuilder meteringMessages; + + bool enabledCapture = true; + TVector<TAutoPtr<IEventHandle>> delayedUpsertRows; + auto grab = [&delayedUpsertRows, &enabledCapture](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) -> auto { + if (enabledCapture && ev->GetTypeRewrite() == NKikimr::TEvDataShard::TEvUploadRowsRequest::EventType) { + delayedUpsertRows.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + + return TTestActorRuntime::EEventAction::PROCESS; + }; + + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&delayedUpsertRows](IEventHandle&) { + return delayedUpsertRows.size() > 0; + }); + + runtime.SetObserverFunc(grab); + + Ydb::TypedValue defaultValue; + defaultValue.mutable_type()->set_type_id(Ydb::Type::UINT64); + defaultValue.mutable_value()->set_uint64_value(10); + + AsyncBuildColumn(runtime, ++txId, tenantSchemeShard, "/MyRoot/ServerLessDB", "/MyRoot/ServerLessDB/Table", "DefaultValue", defaultValue); + + runtime.DispatchEvents(opts); + Cerr << delayedUpsertRows.size() << Endl; + UNIT_ASSERT_C(delayedUpsertRows.size() > 0, "not captured several events"); + + for (ui32 delta = 0; delta < 50; ++delta) { + fnDeleteRow(TTestTxConfig::FakeHiveTablets + 6, 1 + delta, "Table"); + } + + for (const auto& ev: delayedUpsertRows) { + runtime.Send(ev); + } + + enabledCapture = false; + + auto listing = TestListBuildIndex(runtime, tenantSchemeShard, "/MyRoot/ServerLessDB"); + Y_ASSERT(listing.EntriesSize() == 1); + + env.TestWaitNotification(runtime, txId, tenantSchemeShard); + + auto descr = TestGetBuildIndex(runtime, tenantSchemeShard, "/MyRoot/ServerLessDB", txId); + Y_ASSERT(descr.GetIndexBuild().GetState() == Ydb::Table::IndexBuildState::STATE_DONE); + + for (ui32 delta = 50; delta < 101; ++delta) { + UNIT_ASSERT(CheckLocalRowExists(runtime, TTestTxConfig::FakeHiveTablets + 6, "__user__Table", "key", 1 + delta)); + } + + for (ui32 delta = 0; delta < 50; ++delta) { + UNIT_ASSERT(!CheckLocalRowExists(runtime, TTestTxConfig::FakeHiveTablets + 6, "__user__Table", "key", 1 + delta)); + } + } + Y_UNIT_TEST(BaseCase) { TTestBasicRuntime runtime; TTestEnv env(runtime); diff --git a/ydb/core/tx/tx_proxy/upload_rows.cpp b/ydb/core/tx/tx_proxy/upload_rows.cpp index 3c4580a8a8..5bf5d915ee 100644 --- a/ydb/core/tx/tx_proxy/upload_rows.cpp +++ b/ydb/core/tx/tx_proxy/upload_rows.cpp @@ -29,6 +29,9 @@ public: case EUploadRowsMode::WriteToTableShadow: WriteToTableShadow = true; break; + case EUploadRowsMode::UpsertIfExists: + UpsertIfExists = true; + break; } } diff --git a/ydb/core/tx/tx_proxy/upload_rows.h b/ydb/core/tx/tx_proxy/upload_rows.h index 63590b2bff..7c37472620 100644 --- a/ydb/core/tx/tx_proxy/upload_rows.h +++ b/ydb/core/tx/tx_proxy/upload_rows.h @@ -14,6 +14,7 @@ namespace NTxProxy { enum class EUploadRowsMode { Normal, WriteToTableShadow, + UpsertIfExists, }; using TUploadTypes = TVector<std::pair<TString, Ydb::Type>>; diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h index 22f3b8c76b..8bc74b8ecd 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h +++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h @@ -204,6 +204,7 @@ protected: bool WriteToTableShadow = false; bool AllowWriteToPrivateTable = false; bool DiskQuotaExceeded = false; + bool UpsertIfExists = false; std::shared_ptr<arrow::RecordBatch> Batch; float RuCost = 0.0; @@ -1046,6 +1047,9 @@ private: if (WriteToTableShadow) { ev->Record.SetWriteToTableShadow(true); } + if (UpsertIfExists) { + ev->Record.SetUpsertIfExists(true); + } // Copy protobuf settings without rows retryState->Headers = ev->Record; } |