diff options
author | dcherednik <dcherednik@ydb.tech> | 2023-08-18 17:14:33 +0300 |
---|---|---|
committer | dcherednik <dcherednik@ydb.tech> | 2023-08-18 19:11:14 +0300 |
commit | 1cbfd34a55732f7b1d407986b45e40853f01f2c2 (patch) | |
tree | 5786127461bee5550ef95ce51bf5217c2b061a2c | |
parent | 89efb41db3e4ce655dac0b6b3da2fb32616fa98c (diff) | |
download | ydb-1cbfd34a55732f7b1d407986b45e40853f01f2c2.tar.gz |
Move out schemeshard_build_index_tx_base code from *.h file
7 files changed, 432 insertions, 392 deletions
diff --git a/ydb/core/tx/schemeshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/schemeshard/CMakeLists.darwin-x86_64.txt index ccf5144f2c..a1d3433ee0 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.darwin-x86_64.txt @@ -258,6 +258,7 @@ target_sources(core-tx-schemeshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_import.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index__cancel.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index__forget.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index__list.cpp diff --git a/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt index 0843f2fe66..f399397e85 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt @@ -259,6 +259,7 @@ target_sources(core-tx-schemeshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_import.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index__cancel.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index__forget.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index__list.cpp diff --git a/ydb/core/tx/schemeshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/schemeshard/CMakeLists.linux-x86_64.txt index 0843f2fe66..f399397e85 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.linux-x86_64.txt @@ -259,6 +259,7 @@ target_sources(core-tx-schemeshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_import.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index__cancel.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index__forget.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index__list.cpp diff --git a/ydb/core/tx/schemeshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/schemeshard/CMakeLists.windows-x86_64.txt index 85c19cfc15..35bbced865 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.windows-x86_64.txt @@ -258,6 +258,7 @@ target_sources(core-tx-schemeshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_import.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index__cancel.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index__forget.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index__list.cpp diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp new file mode 100644 index 0000000000..5324d4c8ae --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp @@ -0,0 +1,406 @@ +#include "schemeshard_build_index_tx_base.h" + +#include "schemeshard_impl.h" +#include "schemeshard_identificators.h" +#include "schemeshard_billing_helpers.h" +#include "schemeshard_build_index_helpers.h" + +#include "schemeshard__operation_side_effects.h" + +#include <ydb/core/metering/metering.h> +#include <ydb/core/tablet_flat/tablet_flat_executor.h> + +namespace NKikimr { +namespace NSchemeShard { + +void TSchemeShard::TIndexBuilder::TTxBase::ApplyState(NTabletFlatExecutor::TTransactionContext& txc) { + for (auto& rec: StateChanges) { + TIndexBuildId buildId; + TIndexBuildInfo::EState state; + std::tie(buildId, state) = rec; + + Y_VERIFY_S(Self->IndexBuilds.contains(buildId), "IndexBuilds has no " << buildId); + auto buildInfo = Self->IndexBuilds.at(buildId); + LOG_I("Change state from " << buildInfo->State << " to " << state); + buildInfo->State = state; + + NIceDb::TNiceDb db(txc.DB); + Self->PersistBuildIndexState(db, buildInfo); + } +} + +void TSchemeShard::TIndexBuilder::TTxBase::ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) { + SideEffects.ApplyOnExecute(Self, txc, ctx); + ApplyState(txc); + ApplyBill(txc, ctx); +} + +void TSchemeShard::TIndexBuilder::TTxBase::ApplyOnComplete(const TActorContext& ctx) { + SideEffects.ApplyOnComplete(Self, ctx); + ApplySchedule(ctx); +} + +void TSchemeShard::TIndexBuilder::TTxBase::ApplySchedule(const TActorContext& ctx) { + for (const auto& rec: ToScheduleBilling) { + ctx.Schedule( + std::get<1>(rec), + new TEvPrivate::TEvIndexBuildingMakeABill( + ui64(std::get<0>(rec)), + ctx.Now())); + } +} + +ui64 TSchemeShard::TIndexBuilder::TTxBase::RequestUnits(const TBillingStats& stats) { + return TRUCalculator::ReadTable(stats.GetBytes()) + + TRUCalculator::BulkUpsert(stats.GetBytes(), stats.GetRows()); +} + +void TSchemeShard::TIndexBuilder::TTxBase::RoundPeriod(TInstant& start, TInstant& end) { + if (start.Hours() == end.Hours()) { + return; // that is OK + } + + TInstant hourEnd = TInstant::Hours(end.Hours()); + + if (end - hourEnd >= TDuration::Seconds(1)) { + start = hourEnd; + return; + } + + if (hourEnd - start >= TDuration::Seconds(2)) { + end = hourEnd - TDuration::Seconds(1); + return; + } + + start = hourEnd - TDuration::Seconds(2); + end = hourEnd - TDuration::Seconds(1); +} + +void TSchemeShard::TIndexBuilder::TTxBase::ApplyBill(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) +{ + for (const auto& rec: ToBill) { + const auto& buildId = std::get<0>(rec); + auto startPeriod = std::get<1>(rec); + auto endPeriod = std::get<2>(rec); + + if (!startPeriod && !endPeriod) { + startPeriod = endPeriod = ctx.Now(); + } + + RoundPeriod(startPeriod, endPeriod); + + Y_VERIFY_S(Self->IndexBuilds.contains(buildId), "IndexBuilds has no " << buildId); + auto buildInfo = Self->IndexBuilds.at(buildId); + + TBillingStats toBill = buildInfo->Processed - buildInfo->Billed; + if (!toBill) { + continue; + } + + TPath domain = TPath::Init(buildInfo->DomainPathId, Self); + TPathElement::TPtr pathEl = domain.Base(); + + TString cloud_id; + if (pathEl->UserAttrs->Attrs.contains("cloud_id")) { + cloud_id = pathEl->UserAttrs->Attrs.at("cloud_id"); + } + TString folder_id; + if (pathEl->UserAttrs->Attrs.contains("folder_id")) { + folder_id = pathEl->UserAttrs->Attrs.at("folder_id"); + } + TString database_id; + if (pathEl->UserAttrs->Attrs.contains("database_id")) { + database_id = pathEl->UserAttrs->Attrs.at("database_id"); + } + + if (!cloud_id || !folder_id || !database_id) { + LOG_N("ApplyBill: unable to make a bill, neither cloud_id and nor folder_id nor database_id have found in user attributes at the domain" + << ", build index operation: " << buildId + << ", domain: " << domain.PathString() + << ", domainId: " << buildInfo->DomainPathId + << ", tableId: " << buildInfo->TablePathId + << ", not billed usage: " << toBill); + continue; + } + + if (!Self->IsServerlessDomain(domain)) { + LOG_N("ApplyBill: unable to make a bill, domain is not a serverless db" + << ", build index operation: " << buildId + << ", domain: " << domain.PathString() + << ", domainId: " << buildInfo->DomainPathId + << ", IsDomainSchemeShard: " << Self->IsDomainSchemeShard + << ", ParentDomainId: " << Self->ParentDomainId + << ", ResourcesDomainId: " << domain.DomainInfo()->GetResourcesDomainId() + << ", not billed usage: " << toBill); + continue; + } + + NIceDb::TNiceDb db(txc.DB); + + buildInfo->Billed += toBill; + Self->PersistBuildIndexBilling(db, buildInfo); + + ui64 requestUnits = RequestUnits(toBill); + + TString id = TStringBuilder() + << buildId << "-" + << buildInfo->TablePathId.OwnerId << "-" << buildInfo->TablePathId.LocalPathId << "-" + << buildInfo->Billed.GetRows() << "-" << buildInfo->Billed.GetBytes() << "-" + << buildInfo->Processed.GetRows() << "-" << buildInfo->Processed.GetBytes(); + + const TString billRecord = TBillRecord() + .Id(id) + .CloudId(cloud_id) + .FolderId(folder_id) + .ResourceId(database_id) + .SourceWt(ctx.Now()) + .Usage(TBillRecord::RequestUnits(requestUnits, startPeriod, endPeriod)) + .ToString(); + + LOG_D("ApplyBill: made a bill" + << ", buildInfo: " << *buildInfo + << ", record: '" << billRecord << "'"); + + auto request = MakeHolder<NMetering::TEvMetering::TEvWriteMeteringJson>(std::move(billRecord)); + // send message at Complete stage + Send(NMetering::MakeMeteringServiceID(), std::move(request)); + } +} + +void TSchemeShard::TIndexBuilder::TTxBase::Send(TActorId dst, THolder<IEventBase> message, ui32 flags, ui64 cookie) { + SideEffects.Send(dst, message.Release(), cookie, flags); +} + +void TSchemeShard::TIndexBuilder::TTxBase::ChangeState(TIndexBuildId id, TIndexBuildInfo::EState state) { + StateChanges.push_back(TChangeStateRec(id, state)); +} + +void TSchemeShard::TIndexBuilder::TTxBase::Progress(TIndexBuildId id) { + SideEffects.ToProgress(id); +} + +void TSchemeShard::TIndexBuilder::TTxBase::Fill(NKikimrIndexBuilder::TIndexBuild& index, const TIndexBuildInfo::TPtr indexInfo) { + index.SetId(ui64(indexInfo->Id)); + if (indexInfo->Issue) { + AddIssue(index.MutableIssues(), indexInfo->Issue); + } + + for (const auto& item: indexInfo->Shards) { + const TShardIdx& shardIdx = item.first; + const TIndexBuildInfo::TShardStatus& status = item.second; + + if (status.Status != NKikimrTxDataShard::TEvBuildIndexProgressResponse::INPROGRESS) { + if (status.UploadStatus != Ydb::StatusIds::SUCCESS) { + if (status.DebugMessage) { + AddIssue(index.MutableIssues(), status.ToString(shardIdx)); + } + } + } + } + + switch (indexInfo->State) { + case TIndexBuildInfo::EState::Locking: + case TIndexBuildInfo::EState::GatheringStatistics: + case TIndexBuildInfo::EState::Initiating: + index.SetState(Ydb::Table::IndexBuildState::STATE_PREPARING); + index.SetProgress(0.0); + break; + case TIndexBuildInfo::EState::Filling: + index.SetState(Ydb::Table::IndexBuildState::STATE_TRANSFERING_DATA); + index.SetProgress(indexInfo->CalcProgressPercent()); + break; + case TIndexBuildInfo::EState::Applying: + case TIndexBuildInfo::EState::Unlocking: + index.SetState(Ydb::Table::IndexBuildState::STATE_APPLYING); + index.SetProgress(100.0); + break; + case TIndexBuildInfo::EState::Done: + index.SetState(Ydb::Table::IndexBuildState::STATE_DONE); + index.SetProgress(100.0); + break; + case TIndexBuildInfo::EState::Cancellation_Applying: + case TIndexBuildInfo::EState::Cancellation_Unlocking: + index.SetState(Ydb::Table::IndexBuildState::STATE_CANCELLATION); + index.SetProgress(0.0); + break; + case TIndexBuildInfo::EState::Cancelled: + index.SetState(Ydb::Table::IndexBuildState::STATE_CANCELLED); + index.SetProgress(0.0); + break; + case TIndexBuildInfo::EState::Rejection_Applying: + index.SetState(Ydb::Table::IndexBuildState::STATE_REJECTION); + index.SetProgress(0.0); + break; + case TIndexBuildInfo::EState::Rejection_Unlocking: + index.SetState(Ydb::Table::IndexBuildState::STATE_REJECTION); + index.SetProgress(0.0); + break; + case TIndexBuildInfo::EState::Rejected: + index.SetState(Ydb::Table::IndexBuildState::STATE_REJECTED); + index.SetProgress(0.0); + break; + case TIndexBuildInfo::EState::Invalid: + index.SetState(Ydb::Table::IndexBuildState::STATE_UNSPECIFIED); + break; + } + + Fill(*index.MutableSettings(), indexInfo); +} + +void TSchemeShard::TIndexBuilder::TTxBase::Fill(NKikimrIndexBuilder::TIndexBuildSettings& settings, const TIndexBuildInfo::TPtr indexInfo) { + TPath table = TPath::Init(indexInfo->TablePathId, Self); + settings.set_source_path(table.PathString()); + + Ydb::Table::TableIndex& index = *settings.mutable_index(); + index.set_name(indexInfo->IndexName); + + *index.mutable_index_columns() = { + indexInfo->IndexColumns.begin(), + indexInfo->IndexColumns.end() + }; + + *index.mutable_data_columns() = { + indexInfo->DataColumns.begin(), + indexInfo->DataColumns.end() + }; + + switch (indexInfo->IndexType) { + case NKikimrSchemeOp::EIndexType::EIndexTypeGlobal: + *index.mutable_global_index() = Ydb::Table::GlobalIndex(); + break; + case NKikimrSchemeOp::EIndexType::EIndexTypeGlobalAsync: + *index.mutable_global_async_index() = Ydb::Table::GlobalAsyncIndex(); + break; + case NKikimrSchemeOp::EIndexType::EIndexTypeInvalid: + Y_FAIL("Unreachable"); + }; + + settings.set_max_batch_bytes(indexInfo->Limits.MaxBatchBytes); + settings.set_max_batch_rows(indexInfo->Limits.MaxBatchRows); + settings.set_max_shards_in_flight(indexInfo->Limits.MaxShards); + settings.set_max_retries_upload_batch(indexInfo->Limits.MaxRetries); +} + +void TSchemeShard::TIndexBuilder::TTxBase::AddIssue(::google::protobuf::RepeatedPtrField<::Ydb::Issue::IssueMessage>* issues, + const TString& message, + NYql::TSeverityIds::ESeverityId severity) +{ + auto& issue = *issues->Add(); + issue.set_severity(severity); + issue.set_message(message); +} + +void TSchemeShard::TIndexBuilder::TTxBase::SendNotificationsIfFinished(TIndexBuildInfo::TPtr indexInfo) { + if (!indexInfo->IsFinished()) { + return; + } + + LOG_T("TIndexBuildInfo SendNotifications: " + << ": id# " << indexInfo->Id + << ", subscribers count# " << indexInfo->Subscribers.size()); + + TSet<TActorId> toAnswer; + toAnswer.swap(indexInfo->Subscribers); + for (auto& actorId: toAnswer) { + Send(actorId, MakeHolder<TEvSchemeShard::TEvNotifyTxCompletionResult>(ui64(indexInfo->Id))); + } +} + +void TSchemeShard::TIndexBuilder::TTxBase::EraseBuildInfo(const TIndexBuildInfo::TPtr indexBuildInfo) { + Self->IndexBuilds.erase(indexBuildInfo->Id); + Self->IndexBuildsByUid.erase(indexBuildInfo->Uid); + + Self->TxIdToIndexBuilds.erase(indexBuildInfo->LockTxId); + Self->TxIdToIndexBuilds.erase(indexBuildInfo->InitiateTxId); + Self->TxIdToIndexBuilds.erase(indexBuildInfo->ApplyTxId); + Self->TxIdToIndexBuilds.erase(indexBuildInfo->UnlockTxId); +} + +Ydb::StatusIds::StatusCode TSchemeShard::TIndexBuilder::TTxBase::TranslateStatusCode(NKikimrScheme::EStatus status) { + switch (status) { + case NKikimrScheme::StatusSuccess: + case NKikimrScheme::StatusAccepted: + case NKikimrScheme::StatusAlreadyExists: + return Ydb::StatusIds::SUCCESS; + + case NKikimrScheme::StatusPathDoesNotExist: + case NKikimrScheme::StatusPathIsNotDirectory: + case NKikimrScheme::StatusSchemeError: + case NKikimrScheme::StatusNameConflict: + case NKikimrScheme::StatusInvalidParameter: + case NKikimrScheme::StatusRedirectDomain: + return Ydb::StatusIds::BAD_REQUEST; + + case NKikimrScheme::StatusMultipleModifications: + case NKikimrScheme::StatusQuotaExceeded: + return Ydb::StatusIds::OVERLOADED; + + case NKikimrScheme::StatusReadOnly: + case NKikimrScheme::StatusPreconditionFailed: + case NKikimrScheme::StatusResourceExhausted: //TODO: find better YDB status for it + return Ydb::StatusIds::PRECONDITION_FAILED; + + case NKikimrScheme::StatusAccessDenied: + return Ydb::StatusIds::UNAUTHORIZED; + case NKikimrScheme::StatusNotAvailable: + case NKikimrScheme::StatusTxIdNotExists: + case NKikimrScheme::StatusTxIsNotCancellable: + case NKikimrScheme::StatusReserved18: + case NKikimrScheme::StatusReserved19: + Y_FAIL("unreachable"); + } + + return Ydb::StatusIds::STATUS_CODE_UNSPECIFIED; +} + +void TSchemeShard::TIndexBuilder::TTxBase::Bill(const TIndexBuildInfo::TPtr& indexBuildInfo, + TInstant startPeriod, TInstant endPeriod) +{ + ToBill.push_back(TToBill(indexBuildInfo->Id, std::move(startPeriod), std::move(endPeriod))); +} + +void TSchemeShard::TIndexBuilder::TTxBase::AskToScheduleBilling(const TIndexBuildInfo::TPtr& indexBuildInfo) { + if (indexBuildInfo->BillingEventIsScheduled) { + return; + } + + if (indexBuildInfo->State != TIndexBuildInfo::EState::Filling) { + return; + } + + indexBuildInfo->BillingEventIsScheduled = true; + + ToScheduleBilling.push_back(TBillingEventSchedule(indexBuildInfo->Id, indexBuildInfo->ReBillPeriod)); +} + +bool TSchemeShard::TIndexBuilder::TTxBase::GotScheduledBilling(const TIndexBuildInfo::TPtr& indexBuildInfo) { + if (!indexBuildInfo->BillingEventIsScheduled) { + return false; + } + + if (indexBuildInfo->State != TIndexBuildInfo::EState::Filling) { + return false; + } + + indexBuildInfo->BillingEventIsScheduled = false; + + return true; +} + +bool TSchemeShard::TIndexBuilder::TTxBase::Execute(TTransactionContext& txc, const TActorContext& ctx) { + if (!DoExecute(txc, ctx)) { + return false; + } + + ApplyOnExecute(txc, ctx); + return true; +} + +void TSchemeShard::TIndexBuilder::TTxBase::Complete(const TActorContext& ctx) { + DoComplete(ctx); + + ApplyOnComplete(ctx); +} + +} // NSchemeShard +} // NKikimr diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.h b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.h index e9f4adb574..4542a8c7d4 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.h +++ b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.h @@ -1,14 +1,6 @@ #pragma once #include "schemeshard_impl.h" -#include "schemeshard_identificators.h" -#include "schemeshard_billing_helpers.h" -#include "schemeshard_build_index_helpers.h" - -#include "schemeshard__operation_part.h" // TSideEffects, make separate file - -#include <ydb/core/metering/metering.h> -#include <ydb/core/tablet_flat/tablet_flat_executor.h> namespace NKikimr { namespace NSchemeShard { @@ -24,379 +16,29 @@ private: using TToBill = std::tuple<TIndexBuildId, TInstant, TInstant>; TDeque<TToBill> ToBill; - void ApplyState(NTabletFlatExecutor::TTransactionContext& txc) { - for (auto& rec: StateChanges) { - TIndexBuildId buildId; - TIndexBuildInfo::EState state; - std::tie(buildId, state) = rec; - - Y_VERIFY_S(Self->IndexBuilds.contains(buildId), "IndexBuilds has no " << buildId); - auto buildInfo = Self->IndexBuilds.at(buildId); - LOG_I("Change state from " << buildInfo->State << " to " << state); - buildInfo->State = state; - - NIceDb::TNiceDb db(txc.DB); - Self->PersistBuildIndexState(db, buildInfo); - } - } - - void ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) { - SideEffects.ApplyOnExecute(Self, txc, ctx); - ApplyState(txc); - ApplyBill(txc, ctx); - } - - void ApplyOnComplete(const TActorContext& ctx) { - SideEffects.ApplyOnComplete(Self, ctx); - ApplySchedule(ctx); - } - - void ApplySchedule(const TActorContext& ctx) { - for (const auto& rec: ToScheduleBilling) { - ctx.Schedule( - std::get<1>(rec), - new TEvPrivate::TEvIndexBuildingMakeABill( - ui64(std::get<0>(rec)), - ctx.Now())); - } - } - - ui64 RequestUnits(const TBillingStats& stats) { - return TRUCalculator::ReadTable(stats.GetBytes()) - + TRUCalculator::BulkUpsert(stats.GetBytes(), stats.GetRows()); - } - - void RoundPeriod(TInstant& start, TInstant& end) { - if (start.Hours() == end.Hours()) { - return; // that is OK - } - - TInstant hourEnd = TInstant::Hours(end.Hours()); - - if (end - hourEnd >= TDuration::Seconds(1)) { - start = hourEnd; - return; - } - - if (hourEnd - start >= TDuration::Seconds(2)) { - end = hourEnd - TDuration::Seconds(1); - return; - } - - start = hourEnd - TDuration::Seconds(2); - end = hourEnd - TDuration::Seconds(1); - } - - void ApplyBill(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) - { - for (const auto& rec: ToBill) { - const auto& buildId = std::get<0>(rec); - auto startPeriod = std::get<1>(rec); - auto endPeriod = std::get<2>(rec); - - if (!startPeriod && !endPeriod) { - startPeriod = endPeriod = ctx.Now(); - } - - RoundPeriod(startPeriod, endPeriod); - - Y_VERIFY_S(Self->IndexBuilds.contains(buildId), "IndexBuilds has no " << buildId); - auto buildInfo = Self->IndexBuilds.at(buildId); - - TBillingStats toBill = buildInfo->Processed - buildInfo->Billed; - if (!toBill) { - continue; - } - - TPath domain = TPath::Init(buildInfo->DomainPathId, Self); - TPathElement::TPtr pathEl = domain.Base(); - - TString cloud_id; - if (pathEl->UserAttrs->Attrs.contains("cloud_id")) { - cloud_id = pathEl->UserAttrs->Attrs.at("cloud_id"); - } - TString folder_id; - if (pathEl->UserAttrs->Attrs.contains("folder_id")) { - folder_id = pathEl->UserAttrs->Attrs.at("folder_id"); - } - TString database_id; - if (pathEl->UserAttrs->Attrs.contains("database_id")) { - database_id = pathEl->UserAttrs->Attrs.at("database_id"); - } - - if (!cloud_id || !folder_id || !database_id) { - LOG_N("ApplyBill: unable to make a bill, neither cloud_id and nor folder_id nor database_id have found in user attributes at the domain" - << ", build index operation: " << buildId - << ", domain: " << domain.PathString() - << ", domainId: " << buildInfo->DomainPathId - << ", tableId: " << buildInfo->TablePathId - << ", not billed usage: " << toBill); - continue; - } - - if (!Self->IsServerlessDomain(domain)) { - LOG_N("ApplyBill: unable to make a bill, domain is not a serverless db" - << ", build index operation: " << buildId - << ", domain: " << domain.PathString() - << ", domainId: " << buildInfo->DomainPathId - << ", IsDomainSchemeShard: " << Self->IsDomainSchemeShard - << ", ParentDomainId: " << Self->ParentDomainId - << ", ResourcesDomainId: " << domain.DomainInfo()->GetResourcesDomainId() - << ", not billed usage: " << toBill); - continue; - } - - NIceDb::TNiceDb db(txc.DB); - - buildInfo->Billed += toBill; - Self->PersistBuildIndexBilling(db, buildInfo); - - ui64 requestUnits = RequestUnits(toBill); - - TString id = TStringBuilder() - << buildId << "-" - << buildInfo->TablePathId.OwnerId << "-" << buildInfo->TablePathId.LocalPathId << "-" - << buildInfo->Billed.GetRows() << "-" << buildInfo->Billed.GetBytes() << "-" - << buildInfo->Processed.GetRows() << "-" << buildInfo->Processed.GetBytes(); - - const TString billRecord = TBillRecord() - .Id(id) - .CloudId(cloud_id) - .FolderId(folder_id) - .ResourceId(database_id) - .SourceWt(ctx.Now()) - .Usage(TBillRecord::RequestUnits(requestUnits, startPeriod, endPeriod)) - .ToString(); - - LOG_D("ApplyBill: made a bill" - << ", buildInfo: " << *buildInfo - << ", record: '" << billRecord << "'"); - - auto request = MakeHolder<NMetering::TEvMetering::TEvWriteMeteringJson>(std::move(billRecord)); - // send message at Complete stage - Send(NMetering::MakeMeteringServiceID(), std::move(request)); - } - } + void ApplyState(NTabletFlatExecutor::TTransactionContext& txc); + void ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx); + void ApplyOnComplete(const TActorContext& ctx); + void ApplySchedule(const TActorContext& ctx); + ui64 RequestUnits(const TBillingStats& stats); + void RoundPeriod(TInstant& start, TInstant& end); + void ApplyBill(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx); protected: - void Send(TActorId dst, THolder<IEventBase> message, ui32 flags = 0, ui64 cookie = 0) { - SideEffects.Send(dst, message.Release(), cookie, flags); - } - - void ChangeState(TIndexBuildId id, TIndexBuildInfo::EState state) { - StateChanges.push_back(TChangeStateRec(id, state)); - } - - void Progress(TIndexBuildId id) { - SideEffects.ToProgress(id); - } - - void Fill(NKikimrIndexBuilder::TIndexBuild& index, const TIndexBuildInfo::TPtr indexInfo) { - index.SetId(ui64(indexInfo->Id)); - if (indexInfo->Issue) { - AddIssue(index.MutableIssues(), indexInfo->Issue); - } - - for (const auto& item: indexInfo->Shards) { - const TShardIdx& shardIdx = item.first; - const TIndexBuildInfo::TShardStatus& status = item.second; - - if (status.Status != NKikimrTxDataShard::TEvBuildIndexProgressResponse::INPROGRESS) { - if (status.UploadStatus != Ydb::StatusIds::SUCCESS) { - if (status.DebugMessage) { - AddIssue(index.MutableIssues(), status.ToString(shardIdx)); - } - } - } - } - - switch (indexInfo->State) { - case TIndexBuildInfo::EState::Locking: - case TIndexBuildInfo::EState::GatheringStatistics: - case TIndexBuildInfo::EState::Initiating: - index.SetState(Ydb::Table::IndexBuildState::STATE_PREPARING); - index.SetProgress(0.0); - break; - case TIndexBuildInfo::EState::Filling: - index.SetState(Ydb::Table::IndexBuildState::STATE_TRANSFERING_DATA); - index.SetProgress(indexInfo->CalcProgressPercent()); - break; - case TIndexBuildInfo::EState::Applying: - case TIndexBuildInfo::EState::Unlocking: - index.SetState(Ydb::Table::IndexBuildState::STATE_APPLYING); - index.SetProgress(100.0); - break; - case TIndexBuildInfo::EState::Done: - index.SetState(Ydb::Table::IndexBuildState::STATE_DONE); - index.SetProgress(100.0); - break; - case TIndexBuildInfo::EState::Cancellation_Applying: - case TIndexBuildInfo::EState::Cancellation_Unlocking: - index.SetState(Ydb::Table::IndexBuildState::STATE_CANCELLATION); - index.SetProgress(0.0); - break; - case TIndexBuildInfo::EState::Cancelled: - index.SetState(Ydb::Table::IndexBuildState::STATE_CANCELLED); - index.SetProgress(0.0); - break; - case TIndexBuildInfo::EState::Rejection_Applying: - index.SetState(Ydb::Table::IndexBuildState::STATE_REJECTION); - index.SetProgress(0.0); - break; - case TIndexBuildInfo::EState::Rejection_Unlocking: - index.SetState(Ydb::Table::IndexBuildState::STATE_REJECTION); - index.SetProgress(0.0); - break; - case TIndexBuildInfo::EState::Rejected: - index.SetState(Ydb::Table::IndexBuildState::STATE_REJECTED); - index.SetProgress(0.0); - break; - case TIndexBuildInfo::EState::Invalid: - index.SetState(Ydb::Table::IndexBuildState::STATE_UNSPECIFIED); - break; - } - - Fill(*index.MutableSettings(), indexInfo); - } - - void Fill(NKikimrIndexBuilder::TIndexBuildSettings& settings, const TIndexBuildInfo::TPtr indexInfo) { - TPath table = TPath::Init(indexInfo->TablePathId, Self); - settings.set_source_path(table.PathString()); - - Ydb::Table::TableIndex& index = *settings.mutable_index(); - index.set_name(indexInfo->IndexName); - - *index.mutable_index_columns() = { - indexInfo->IndexColumns.begin(), - indexInfo->IndexColumns.end() - }; - - *index.mutable_data_columns() = { - indexInfo->DataColumns.begin(), - indexInfo->DataColumns.end() - }; - - switch (indexInfo->IndexType) { - case NKikimrSchemeOp::EIndexType::EIndexTypeGlobal: - *index.mutable_global_index() = Ydb::Table::GlobalIndex(); - break; - case NKikimrSchemeOp::EIndexType::EIndexTypeGlobalAsync: - *index.mutable_global_async_index() = Ydb::Table::GlobalAsyncIndex(); - break; - case NKikimrSchemeOp::EIndexType::EIndexTypeInvalid: - Y_FAIL("Unreachable"); - }; - - settings.set_max_batch_bytes(indexInfo->Limits.MaxBatchBytes); - settings.set_max_batch_rows(indexInfo->Limits.MaxBatchRows); - settings.set_max_shards_in_flight(indexInfo->Limits.MaxShards); - settings.set_max_retries_upload_batch(indexInfo->Limits.MaxRetries); - } - + void Send(TActorId dst, THolder<IEventBase> message, ui32 flags = 0, ui64 cookie = 0); + void ChangeState(TIndexBuildId id, TIndexBuildInfo::EState state); + void Progress(TIndexBuildId id); + void Fill(NKikimrIndexBuilder::TIndexBuild& index, const TIndexBuildInfo::TPtr indexInfo); + void Fill(NKikimrIndexBuilder::TIndexBuildSettings& settings, const TIndexBuildInfo::TPtr indexInfo); void AddIssue(::google::protobuf::RepeatedPtrField< ::Ydb::Issue::IssueMessage>* issues, const TString& message, - NYql::TSeverityIds::ESeverityId severity = NYql::TSeverityIds::S_ERROR) - { - auto& issue = *issues->Add(); - issue.set_severity(severity); - issue.set_message(message); - } - - void SendNotificationsIfFinished(TIndexBuildInfo::TPtr indexInfo) { - if (!indexInfo->IsFinished()) { - return; - } - - LOG_T("TIndexBuildInfo SendNotifications: " - << ": id# " << indexInfo->Id - << ", subscribers count# " << indexInfo->Subscribers.size()); - - TSet<TActorId> toAnswer; - toAnswer.swap(indexInfo->Subscribers); - for (auto& actorId: toAnswer) { - Send(actorId, MakeHolder<TEvSchemeShard::TEvNotifyTxCompletionResult>(ui64(indexInfo->Id))); - } - } - - void EraseBuildInfo(const TIndexBuildInfo::TPtr indexBuildInfo) { - Self->IndexBuilds.erase(indexBuildInfo->Id); - Self->IndexBuildsByUid.erase(indexBuildInfo->Uid); - - Self->TxIdToIndexBuilds.erase(indexBuildInfo->LockTxId); - Self->TxIdToIndexBuilds.erase(indexBuildInfo->InitiateTxId); - Self->TxIdToIndexBuilds.erase(indexBuildInfo->ApplyTxId); - Self->TxIdToIndexBuilds.erase(indexBuildInfo->UnlockTxId); - } - - Ydb::StatusIds::StatusCode TranslateStatusCode(NKikimrScheme::EStatus status) { - switch (status) { - case NKikimrScheme::StatusSuccess: - case NKikimrScheme::StatusAccepted: - case NKikimrScheme::StatusAlreadyExists: - return Ydb::StatusIds::SUCCESS; - - case NKikimrScheme::StatusPathDoesNotExist: - case NKikimrScheme::StatusPathIsNotDirectory: - case NKikimrScheme::StatusSchemeError: - case NKikimrScheme::StatusNameConflict: - case NKikimrScheme::StatusInvalidParameter: - case NKikimrScheme::StatusRedirectDomain: - return Ydb::StatusIds::BAD_REQUEST; - - case NKikimrScheme::StatusMultipleModifications: - case NKikimrScheme::StatusQuotaExceeded: - return Ydb::StatusIds::OVERLOADED; - - case NKikimrScheme::StatusReadOnly: - case NKikimrScheme::StatusPreconditionFailed: - case NKikimrScheme::StatusResourceExhausted: //TODO: find better YDB status for it - return Ydb::StatusIds::PRECONDITION_FAILED; - - case NKikimrScheme::StatusAccessDenied: - return Ydb::StatusIds::UNAUTHORIZED; - case NKikimrScheme::StatusNotAvailable: - case NKikimrScheme::StatusTxIdNotExists: - case NKikimrScheme::StatusTxIsNotCancellable: - case NKikimrScheme::StatusReserved18: - case NKikimrScheme::StatusReserved19: - Y_FAIL("unreachable"); - } - - return Ydb::StatusIds::STATUS_CODE_UNSPECIFIED; - } - - void Bill(const TIndexBuildInfo::TPtr& indexBuildInfo, TInstant startPeriod = TInstant::Zero(), TInstant endPeriod = TInstant::Zero()) { - - ToBill.push_back(TToBill(indexBuildInfo->Id, std::move(startPeriod), std::move(endPeriod))); - } - - void AskToScheduleBilling(const TIndexBuildInfo::TPtr& indexBuildInfo) { - if (indexBuildInfo->BillingEventIsScheduled) { - return; - } - - if (indexBuildInfo->State != TIndexBuildInfo::EState::Filling) { - return; - } - - indexBuildInfo->BillingEventIsScheduled = true; - - ToScheduleBilling.push_back(TBillingEventSchedule(indexBuildInfo->Id, indexBuildInfo->ReBillPeriod)); - } - - bool GotScheduledBilling(const TIndexBuildInfo::TPtr& indexBuildInfo) { - if (!indexBuildInfo->BillingEventIsScheduled) { - return false; - } - - if (indexBuildInfo->State != TIndexBuildInfo::EState::Filling) { - return false; - } - - indexBuildInfo->BillingEventIsScheduled = false; - - return true; - } + NYql::TSeverityIds::ESeverityId severity = NYql::TSeverityIds::S_ERROR); + void SendNotificationsIfFinished(TIndexBuildInfo::TPtr indexInfo); + void EraseBuildInfo(const TIndexBuildInfo::TPtr indexBuildInfo); + Ydb::StatusIds::StatusCode TranslateStatusCode(NKikimrScheme::EStatus status); + void Bill(const TIndexBuildInfo::TPtr& indexBuildInfo, TInstant startPeriod = TInstant::Zero(), TInstant endPeriod = TInstant::Zero()); + void AskToScheduleBilling(const TIndexBuildInfo::TPtr& indexBuildInfo); + bool GotScheduledBilling(const TIndexBuildInfo::TPtr& indexBuildInfo); public: explicit TTxBase(TSelf* self) @@ -408,21 +50,8 @@ public: virtual bool DoExecute(TTransactionContext& txc, const TActorContext& ctx) = 0; virtual void DoComplete(const TActorContext& ctx) = 0; - bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { - if (!DoExecute(txc, ctx)) { - return false; - } - - ApplyOnExecute(txc, ctx); - return true; - } - - void Complete(const TActorContext& ctx) override { - DoComplete(ctx); - - ApplyOnComplete(ctx); - } - + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; + void Complete(const TActorContext& ctx) override; }; } // NSchemeShard diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make index 0e0c5e8d3b..1d458745bd 100644 --- a/ydb/core/tx/schemeshard/ya.make +++ b/ydb/core/tx/schemeshard/ya.make @@ -213,6 +213,7 @@ SRCS( schemeshard_import_flow_proposals.cpp schemeshard_import.cpp schemeshard_build_index.cpp + schemeshard_build_index_tx_base.cpp schemeshard_build_index__cancel.cpp schemeshard_build_index__forget.cpp schemeshard_build_index__list.cpp |