diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-09-28 16:32:45 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-09-28 17:08:06 +0300 |
commit | 34959cf2fc1527a871fc3ac9e59eee9157363e2f (patch) | |
tree | 0d0a242094ba33b15bb3c672c739e8a35cd016b8 | |
parent | 1e354259320faa36787f594c9ab79228751c8728 (diff) | |
download | ydb-34959cf2fc1527a871fc3ac9e59eee9157363e2f.tar.gz |
KIKIMR-19211: use separated insert conveyor and multi tasks mode usage
special hard verify on blob reading
-rw-r--r-- | ydb/core/driver_lib/run/config.h | 1 | ||||
-rw-r--r-- | ydb/core/driver_lib/run/kikimr_services_initializers.cpp | 34 | ||||
-rw-r--r-- | ydb/core/driver_lib/run/kikimr_services_initializers.h | 6 | ||||
-rw-r--r-- | ydb/core/driver_lib/run/run.cpp | 4 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 1 | ||||
-rw-r--r-- | ydb/core/protos/console_config.proto | 2 | ||||
-rw-r--r-- | ydb/core/testlib/test_client.cpp | 5 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__write.cpp | 12 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.h | 3 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/counters/columnshard.cpp | 5 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/counters/columnshard.h | 25 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp | 5 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h | 5 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/writer/write_controller.h | 7 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/write_actor.cpp | 5 | ||||
-rw-r--r-- | ydb/core/tx/conveyor/usage/service.h | 26 | ||||
-rw-r--r-- | ydb/core/tx/ev_write/write_data.h | 9 |
17 files changed, 143 insertions, 12 deletions
diff --git a/ydb/core/driver_lib/run/config.h b/ydb/core/driver_lib/run/config.h index 80deeb3aed..00dcdc5d4a 100644 --- a/ydb/core/driver_lib/run/config.h +++ b/ydb/core/driver_lib/run/config.h @@ -72,6 +72,7 @@ union TBasicKikimrServicesMask { bool EnableExternalIndex: 1; bool EnableScanConveyor : 1; bool EnableCompConveyor : 1; + bool EnableInsertConveyor : 1; bool EnableLocalPgWire:1; bool EnableKafkaProxy:1; bool EnableIcNodeCacheService:1; diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 7c5af73734..519e845213 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -2126,9 +2126,9 @@ void TCompConveyorInitializer::InitializeServices(NActors::TActorSystemSetup* se NConveyor::TConfig serviceConfig; if (Config.HasCompConveyorConfig()) { Y_VERIFY(serviceConfig.DeserializeFromProto(Config.GetCompConveyorConfig())); - if (!serviceConfig.HasDefaultFractionOfThreadsCount()) { - serviceConfig.SetDefaultFractionOfThreadsCount(0.33); - } + } + if (!serviceConfig.HasDefaultFractionOfThreadsCount()) { + serviceConfig.SetDefaultFractionOfThreadsCount(0.33); } if (serviceConfig.IsEnabled()) { @@ -2152,6 +2152,9 @@ void TScanConveyorInitializer::InitializeServices(NActors::TActorSystemSetup* se if (Config.HasScanConveyorConfig()) { Y_VERIFY(serviceConfig.DeserializeFromProto(Config.GetScanConveyorConfig())); } + if (!serviceConfig.HasDefaultFractionOfThreadsCount()) { + serviceConfig.SetDefaultFractionOfThreadsCount(0.33); + } if (serviceConfig.IsEnabled()) { TIntrusivePtr<::NMonitoring::TDynamicCounters> tabletGroup = GetServiceCounters(appData->Counters, "tablets"); @@ -2165,6 +2168,31 @@ void TScanConveyorInitializer::InitializeServices(NActors::TActorSystemSetup* se } } +TInsertConveyorInitializer::TInsertConveyorInitializer(const TKikimrRunConfig& runConfig) + : IKikimrServicesInitializer(runConfig) { +} + +void TInsertConveyorInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) { + NConveyor::TConfig serviceConfig; + if (Config.HasInsertConveyorConfig()) { + Y_VERIFY(serviceConfig.DeserializeFromProto(Config.GetInsertConveyorConfig())); + } + if (!serviceConfig.HasDefaultFractionOfThreadsCount()) { + serviceConfig.SetDefaultFractionOfThreadsCount(0.2); + } + + if (serviceConfig.IsEnabled()) { + TIntrusivePtr<::NMonitoring::TDynamicCounters> tabletGroup = GetServiceCounters(appData->Counters, "tablets"); + TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorGroup = tabletGroup->GetSubgroup("type", "TX_INSERT_CONVEYOR"); + + auto service = NConveyor::TInsertServiceOperator::CreateService(serviceConfig, conveyorGroup); + + setup->LocalServices.push_back(std::make_pair( + NConveyor::TInsertServiceOperator::MakeServiceId(NodeId), + TActorSetupCmd(service, TMailboxType::HTSwap, appData->UserPoolId))); + } +} + TExternalIndexInitializer::TExternalIndexInitializer(const TKikimrRunConfig& runConfig) : IKikimrServicesInitializer(runConfig) { } diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.h b/ydb/core/driver_lib/run/kikimr_services_initializers.h index f2048a0b2c..fba690bfbb 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.h +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.h @@ -401,6 +401,12 @@ public: void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override; }; +class TInsertConveyorInitializer: public IKikimrServicesInitializer { +public: + TInsertConveyorInitializer(const TKikimrRunConfig& runConfig); + void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override; +}; + class TExternalIndexInitializer: public IKikimrServicesInitializer { public: TExternalIndexInitializer(const TKikimrRunConfig& runConfig); diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index cb4c6de3db..a6fd20052d 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -1506,6 +1506,10 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers sil->AddServiceInitializer(new TCompConveyorInitializer(runConfig)); } + if (serviceMask.EnableInsertConveyor) { + sil->AddServiceInitializer(new TInsertConveyorInitializer(runConfig)); + } + if (serviceMask.EnableBackgroundTasks) { sil->AddServiceInitializer(new TBackgroundTasksInitializer(runConfig)); } diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 8f932ea610..ad2d137bfb 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -2050,6 +2050,7 @@ message TAppConfig { optional TKafkaProxyConfig KafkaProxyConfig = 71; optional TConveyorConfig CompConveyorConfig = 72; optional TQueryServiceConfig QueryServiceConfig = 73; + optional TConveyorConfig InsertConveyorConfig = 74; repeated TNamedConfig NamedConfigs = 100; optional string ClusterYamlConfig = 101; diff --git a/ydb/core/protos/console_config.proto b/ydb/core/protos/console_config.proto index 3b37c9763c..76c52ce8dc 100644 --- a/ydb/core/protos/console_config.proto +++ b/ydb/core/protos/console_config.proto @@ -126,6 +126,8 @@ message TConfigItem { AwsCompatibilityConfigItem = 70; KafkaProxyConfig = 71; CompConveyorConfigItem = 72; + QueryServiceConfigItem = 73; + InsertConveyorConfigItem = 74; NamedConfigsItem = 100; ClusterYamlConfigItem = 101; diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index d250e8277e..00f60235e2 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -766,6 +766,11 @@ namespace Tests { const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0); Runtime->RegisterService(NConveyor::TCompServiceOperator::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx); } + { + auto* actor = NConveyor::TInsertServiceOperator::CreateService(NConveyor::TConfig(), new ::NMonitoring::TDynamicCounters()); + const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0); + Runtime->RegisterService(NConveyor::TInsertServiceOperator::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx); + } Runtime->Register(CreateLabelsMaintainer({}), nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0); auto sysViewService = NSysView::CreateSysViewServiceForTests(); diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 5f5068768c..017df46d15 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -74,16 +74,20 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo if (writeMeta.HasLongTxId()) { auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, errCode); ctx.Send(writeMeta.GetSource(), result.release()); - CSCounters.OnFailedWriteResponse(); } else { auto operation = OperationsManager.GetOperation((TWriteId)writeMeta.GetWriteId()); Y_VERIFY(operation); auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(operation->GetTxId(), NKikimrDataEvents::TEvWriteResult::ERROR, "put data fails"); ctx.Send(writeMeta.GetSource(), result.release()); - CSCounters.OnFailedWriteResponse(); } + CSCounters.OnFailedWriteResponse(); } else { CSCounters.OnWritePutBlobsSuccess((TMonotonic::Now() - writeMeta.GetWriteStartInstant()).MilliSeconds()); + CSCounters.OnWriteMiddle1PutBlobsSuccess((TMonotonic::Now() - writeMeta.GetWriteMiddle1StartInstant()).MilliSeconds()); + CSCounters.OnWriteMiddle2PutBlobsSuccess((TMonotonic::Now() - writeMeta.GetWriteMiddle2StartInstant()).MilliSeconds()); + CSCounters.OnWriteMiddle3PutBlobsSuccess((TMonotonic::Now() - writeMeta.GetWriteMiddle3StartInstant()).MilliSeconds()); + CSCounters.OnWriteMiddle4PutBlobsSuccess((TMonotonic::Now() - writeMeta.GetWriteMiddle4StartInstant()).MilliSeconds()); + CSCounters.OnWriteMiddle5PutBlobsSuccess((TMonotonic::Now() - writeMeta.GetWriteMiddle5StartInstant()).MilliSeconds()); LOG_S_DEBUG("Write (record) into pathId " << writeMeta.GetTableId() << (writeMeta.GetWriteId() ? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : "") << " at tablet " << TabletID()); @@ -161,10 +165,10 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex << (writeMeta.GetWriteId()? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : " ") << WritesMonitor.DebugString() << " at tablet " << TabletID()); - + writeData.MutableWriteMeta().SetWriteMiddle1StartInstant(TMonotonic::Now()); std::shared_ptr<NConveyor::ITask> task = std::make_shared<NOlap::TBuildSlicesTask>(TabletID(), SelfId(), StoragesManager->GetInsertOperator()->StartWritingAction(), writeData); - NConveyor::TCompServiceOperator::SendTaskToExecute(task); + NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task); } } diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 2e1d6622e8..6d6612e97d 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -51,7 +51,7 @@ class TOperationsManager; extern bool gAllowLogBatchingDefaultValue; -IActor* CreateWriteActor(ui64 tabletId, IWriteController::TPtr writeController, const TInstant& deadline); +IActor* CreateWriteActor(ui64 tabletId, IWriteController::TPtr writeController, const TInstant deadline); IActor* CreateReadActor(ui64 tabletId, const NActors::TActorId readBlobsActor, const TActorId& dstActor, const std::shared_ptr<NOlap::IStoragesManager>& storages, std::unique_ptr<TEvColumnShard::TEvReadResult>&& event, @@ -60,7 +60,6 @@ IActor* CreateReadActor(ui64 tabletId, const NActors::TActorId readBlobsActor, const TActorId& columnShardActorId, ui64 requestCookie, const TConcreteScanCounters& counters); IActor* CreateColumnShardScan(const TActorId& scanComputeActor, ui32 scanId, ui64 txId); -IActor* CreateExportActor(const ui64 tabletId, const TActorId& dstActor, TAutoPtr<TEvPrivate::TEvExport> ev); struct TSettings { static constexpr ui32 MAX_ACTIVE_COMPACTIONS = 1; diff --git a/ydb/core/tx/columnshard/counters/columnshard.cpp b/ydb/core/tx/columnshard/counters/columnshard.cpp index ca92df0aa8..989deae6b9 100644 --- a/ydb/core/tx/columnshard/counters/columnshard.cpp +++ b/ydb/core/tx/columnshard/counters/columnshard.cpp @@ -34,6 +34,11 @@ TCSCounters::TCSCounters() SplitCompactionGranulePortionsCount = TBase::GetValueAutoAggregationsClient("SplitCompaction/PortionsCount"); HistogramSuccessWritePutBlobsDurationMs = TBase::GetHistogram("SuccessWritePutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5)); + HistogramSuccessWriteMiddle1PutBlobsDurationMs = TBase::GetHistogram("SuccessWriteMiddle1PutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5)); + HistogramSuccessWriteMiddle2PutBlobsDurationMs = TBase::GetHistogram("SuccessWriteMiddle2PutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5)); + HistogramSuccessWriteMiddle3PutBlobsDurationMs = TBase::GetHistogram("SuccessWriteMiddle3PutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5)); + HistogramSuccessWriteMiddle4PutBlobsDurationMs = TBase::GetHistogram("SuccessWriteMiddle4PutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5)); + HistogramSuccessWriteMiddle5PutBlobsDurationMs = TBase::GetHistogram("SuccessWriteMiddle5PutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5)); HistogramFailedWritePutBlobsDurationMs = TBase::GetHistogram("FailedWritePutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5)); HistogramWriteTxCompleteDurationMs = TBase::GetHistogram("WriteTxCompleteDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5)); WritePutBlobsCount = TBase::GetValue("WritePutBlobs"); diff --git a/ydb/core/tx/columnshard/counters/columnshard.h b/ydb/core/tx/columnshard/counters/columnshard.h index 3f602a830f..ffe935c995 100644 --- a/ydb/core/tx/columnshard/counters/columnshard.h +++ b/ydb/core/tx/columnshard/counters/columnshard.h @@ -34,6 +34,11 @@ private: std::shared_ptr<TValueAggregationClient> SplitCompactionGranulePortionsCount; NMonitoring::THistogramPtr HistogramSuccessWritePutBlobsDurationMs; + NMonitoring::THistogramPtr HistogramSuccessWriteMiddle1PutBlobsDurationMs; + NMonitoring::THistogramPtr HistogramSuccessWriteMiddle2PutBlobsDurationMs; + NMonitoring::THistogramPtr HistogramSuccessWriteMiddle3PutBlobsDurationMs; + NMonitoring::THistogramPtr HistogramSuccessWriteMiddle4PutBlobsDurationMs; + NMonitoring::THistogramPtr HistogramSuccessWriteMiddle5PutBlobsDurationMs; NMonitoring::THistogramPtr HistogramFailedWritePutBlobsDurationMs; NMonitoring::THistogramPtr HistogramWriteTxCompleteDurationMs; NMonitoring::TDynamicCounters::TCounterPtr WritePutBlobsCount; @@ -60,6 +65,26 @@ public: WritePutBlobsCount->Sub(1); } + void OnWriteMiddle1PutBlobsSuccess(const ui32 milliseconds) const { + HistogramSuccessWriteMiddle1PutBlobsDurationMs->Collect(milliseconds); + } + + void OnWriteMiddle2PutBlobsSuccess(const ui32 milliseconds) const { + HistogramSuccessWriteMiddle2PutBlobsDurationMs->Collect(milliseconds); + } + + void OnWriteMiddle3PutBlobsSuccess(const ui32 milliseconds) const { + HistogramSuccessWriteMiddle3PutBlobsDurationMs->Collect(milliseconds); + } + + void OnWriteMiddle4PutBlobsSuccess(const ui32 milliseconds) const { + HistogramSuccessWriteMiddle4PutBlobsDurationMs->Collect(milliseconds); + } + + void OnWriteMiddle5PutBlobsSuccess(const ui32 milliseconds) const { + HistogramSuccessWriteMiddle5PutBlobsDurationMs->Collect(milliseconds); + } + void OnWritePutBlobsFail(const ui32 milliseconds) const { HistogramFailedWritePutBlobsDurationMs->Collect(milliseconds); WritePutBlobsCount->Sub(1); diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp index e4b8b122ae..ca7cbfe4a3 100644 --- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp +++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp @@ -20,6 +20,7 @@ TIndexedWriteController::TIndexedWriteController(const TActorId& dstActor, const } void TIndexedWriteController::DoOnReadyResult(const NActors::TActorContext& ctx, const NColumnShard::TBlobPutResult::TPtr& putResult) { + WriteData.MutableWriteMeta().SetWriteMiddle4StartInstant(TMonotonic::Now()); if (putResult->GetPutStatus() == NKikimrProto::OK) { std::vector<std::shared_ptr<IBlobsWritingAction>> actions = {Action}; auto result = std::make_unique<NColumnShard::TEvPrivate::TEvWriteBlobsResult>(putResult, std::move(BlobData), actions, WriteData.GetWriteMeta(), WriteData.GetData().GetSchemaVersion()); @@ -30,4 +31,8 @@ void TIndexedWriteController::DoOnReadyResult(const NActors::TActorContext& ctx, } } +void TIndexedWriteController::DoOnStartSending() { + WriteData.MutableWriteMeta().SetWriteMiddle5StartInstant(TMonotonic::Now()); +} + } diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h index c9bac60f4b..5261ee7eef 100644 --- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h +++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h @@ -5,6 +5,7 @@ #include <ydb/core/tx/ev_write/write_data.h> #include <ydb/core/tx/columnshard/blobs_action/abstract/write.h> +#include <ydb/core/tx/columnshard/counters/common/object_counter.h> #include <ydb/core/tx/columnshard/engines/portion_info.h> #include <ydb/core/tx/columnshard/columnshard.h> #include <ydb/core/tx/columnshard/columnshard_private_events.h> @@ -13,7 +14,7 @@ namespace NKikimr::NOlap { -class TIndexedWriteController : public NColumnShard::IWriteController { +class TIndexedWriteController : public NColumnShard::IWriteController, public NColumnShard::TMonitoringObjectsCounter<TIndexedWriteController, true> { private: std::vector<NArrow::TSerializedBatch> BlobsSplitted; NEvWrite::TWriteData WriteData; @@ -21,6 +22,8 @@ private: TActorId DstActor; std::shared_ptr<IBlobsWritingAction> Action; void DoOnReadyResult(const NActors::TActorContext& ctx, const NColumnShard::TBlobPutResult::TPtr& putResult) override; + virtual void DoOnStartSending() override; + public: TIndexedWriteController(const TActorId& dstActor, const NEvWrite::TWriteData& writeData, const std::shared_ptr<IBlobsWritingAction>& action, std::vector<NArrow::TSerializedBatch>&& blobsSplitted); diff --git a/ydb/core/tx/columnshard/engines/writer/write_controller.h b/ydb/core/tx/columnshard/engines/writer/write_controller.h index 38c7dda0f3..2056f98174 100644 --- a/ydb/core/tx/columnshard/engines/writer/write_controller.h +++ b/ydb/core/tx/columnshard/engines/writer/write_controller.h @@ -51,6 +51,9 @@ protected: virtual void DoOnBlobWriteResult(const TEvBlobStorage::TEvPutResult& /*result*/) { } + virtual void DoOnStartSending() { + + } NOlap::TBlobWriteInfo& AddWriteTask(NOlap::TBlobWriteInfo&& task) { WritingActions.emplace(task.GetWriteOperator()->GetActionId(), task.GetWriteOperator()); @@ -67,6 +70,10 @@ public: using TPtr = std::shared_ptr<IWriteController>; virtual ~IWriteController() {} + void OnStartSending() { + DoOnStartSending(); + } + void OnReadyResult(const NActors::TActorContext& ctx, const TBlobPutResult::TPtr& putResult) { putResult->AddResources(ResourceUsage); DoOnReadyResult(ctx, putResult); diff --git a/ydb/core/tx/columnshard/write_actor.cpp b/ydb/core/tx/columnshard/write_actor.cpp index 379028413c..6879d1bb59 100644 --- a/ydb/core/tx/columnshard/write_actor.cpp +++ b/ydb/core/tx/columnshard/write_actor.cpp @@ -19,7 +19,7 @@ class TWriteActor : public TActorBootstrapped<TWriteActor>, public TMonitoringOb TInstant Deadline; public: - TWriteActor(ui64 tabletId, IWriteController::TPtr writeController, const TInstant& deadline) + TWriteActor(ui64 tabletId, IWriteController::TPtr writeController, const TInstant deadline) : TabletId(tabletId) , WriteController(writeController) , Deadline(deadline) @@ -74,6 +74,7 @@ public: } void Bootstrap(const TActorContext& ctx) { + WriteController->OnStartSending(); if (Deadline != TInstant::Max()) { TInstant now = TAppData::TimeProvider->Now(); if (Deadline <= now) { @@ -107,7 +108,7 @@ public: } // namespace -IActor* CreateWriteActor(ui64 tabletId, IWriteController::TPtr writeController, const TInstant& deadline) { +IActor* CreateWriteActor(ui64 tabletId, IWriteController::TPtr writeController, const TInstant deadline) { return new TWriteActor(tabletId, writeController, deadline); } diff --git a/ydb/core/tx/conveyor/usage/service.h b/ydb/core/tx/conveyor/usage/service.h index 6cbe5ef3e0..50e4fc8b9d 100644 --- a/ydb/core/tx/conveyor/usage/service.h +++ b/ydb/core/tx/conveyor/usage/service.h @@ -7,6 +7,22 @@ namespace NKikimr::NConveyor { +class TAsyncTaskExecutor: public TActorBootstrapped<TAsyncTaskExecutor> { +private: + const std::shared_ptr<ITask> Task; +public: + TAsyncTaskExecutor(const std::shared_ptr<ITask>& task) + : Task(task) + { + + } + + void Bootstrap() { + auto gAway = PassAwayGuard(); + Task->Execute(nullptr); + } +}; + template <class TConveyorPolicy> class TServiceOperatorImpl { private: @@ -20,6 +36,10 @@ private: return TConveyorPolicy::Name; } public: + static void AsyncTaskToExecute(const std::shared_ptr<ITask>& task) { + auto& context = NActors::TActorContext::AsActorContext(); + context.Register(new TAsyncTaskExecutor(task)); + } static bool SendTaskToExecute(const std::shared_ptr<ITask>& task) { auto& context = NActors::TActorContext::AsActorContext(); const NActors::TActorId& selfId = context.SelfID; @@ -55,7 +75,13 @@ public: static const inline TString Name = "Comp"; }; +class TInsertConveyorPolicy { +public: + static const inline TString Name = "Isrt"; +}; + using TScanServiceOperator = TServiceOperatorImpl<TScanConveyorPolicy>; using TCompServiceOperator = TServiceOperatorImpl<TCompConveyorPolicy>; +using TInsertServiceOperator = TServiceOperatorImpl<TInsertConveyorPolicy>; } diff --git a/ydb/core/tx/ev_write/write_data.h b/ydb/core/tx/ev_write/write_data.h index 9d95b58b98..81d1d673b2 100644 --- a/ydb/core/tx/ev_write/write_data.h +++ b/ydb/core/tx/ev_write/write_data.h @@ -28,6 +28,11 @@ class TWriteMeta { YDB_ACCESSOR_DEF(TString, DedupId); YDB_READONLY(TMonotonic, WriteStartInstant, TMonotonic::Now()); + YDB_ACCESSOR(TMonotonic, WriteMiddle1StartInstant, TMonotonic::Now()); + YDB_ACCESSOR(TMonotonic, WriteMiddle2StartInstant, TMonotonic::Now()); + YDB_ACCESSOR(TMonotonic, WriteMiddle3StartInstant, TMonotonic::Now()); + YDB_ACCESSOR(TMonotonic, WriteMiddle4StartInstant, TMonotonic::Now()); + YDB_ACCESSOR(TMonotonic, WriteMiddle5StartInstant, TMonotonic::Now()); public: TWriteMeta(const ui64 writeId, const ui64 tableId, const NActors::TActorId& source) : WriteId(writeId) @@ -54,6 +59,10 @@ public: return WriteMeta; } + TWriteMeta& MutableWriteMeta() { + return WriteMeta; + } + ui64 GetSize() const { return Data->GetData().size(); } |