aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-09-28 16:32:45 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-09-28 17:08:06 +0300
commit34959cf2fc1527a871fc3ac9e59eee9157363e2f (patch)
tree0d0a242094ba33b15bb3c672c739e8a35cd016b8
parent1e354259320faa36787f594c9ab79228751c8728 (diff)
downloadydb-34959cf2fc1527a871fc3ac9e59eee9157363e2f.tar.gz
KIKIMR-19211: use separated insert conveyor and multi tasks mode usage
special hard verify on blob reading
-rw-r--r--ydb/core/driver_lib/run/config.h1
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp34
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.h6
-rw-r--r--ydb/core/driver_lib/run/run.cpp4
-rw-r--r--ydb/core/protos/config.proto1
-rw-r--r--ydb/core/protos/console_config.proto2
-rw-r--r--ydb/core/testlib/test_client.cpp5
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp12
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h3
-rw-r--r--ydb/core/tx/columnshard/counters/columnshard.cpp5
-rw-r--r--ydb/core/tx/columnshard/counters/columnshard.h25
-rw-r--r--ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h5
-rw-r--r--ydb/core/tx/columnshard/engines/writer/write_controller.h7
-rw-r--r--ydb/core/tx/columnshard/write_actor.cpp5
-rw-r--r--ydb/core/tx/conveyor/usage/service.h26
-rw-r--r--ydb/core/tx/ev_write/write_data.h9
17 files changed, 143 insertions, 12 deletions
diff --git a/ydb/core/driver_lib/run/config.h b/ydb/core/driver_lib/run/config.h
index 80deeb3aed..00dcdc5d4a 100644
--- a/ydb/core/driver_lib/run/config.h
+++ b/ydb/core/driver_lib/run/config.h
@@ -72,6 +72,7 @@ union TBasicKikimrServicesMask {
bool EnableExternalIndex: 1;
bool EnableScanConveyor : 1;
bool EnableCompConveyor : 1;
+ bool EnableInsertConveyor : 1;
bool EnableLocalPgWire:1;
bool EnableKafkaProxy:1;
bool EnableIcNodeCacheService:1;
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index 7c5af73734..519e845213 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -2126,9 +2126,9 @@ void TCompConveyorInitializer::InitializeServices(NActors::TActorSystemSetup* se
NConveyor::TConfig serviceConfig;
if (Config.HasCompConveyorConfig()) {
Y_VERIFY(serviceConfig.DeserializeFromProto(Config.GetCompConveyorConfig()));
- if (!serviceConfig.HasDefaultFractionOfThreadsCount()) {
- serviceConfig.SetDefaultFractionOfThreadsCount(0.33);
- }
+ }
+ if (!serviceConfig.HasDefaultFractionOfThreadsCount()) {
+ serviceConfig.SetDefaultFractionOfThreadsCount(0.33);
}
if (serviceConfig.IsEnabled()) {
@@ -2152,6 +2152,9 @@ void TScanConveyorInitializer::InitializeServices(NActors::TActorSystemSetup* se
if (Config.HasScanConveyorConfig()) {
Y_VERIFY(serviceConfig.DeserializeFromProto(Config.GetScanConveyorConfig()));
}
+ if (!serviceConfig.HasDefaultFractionOfThreadsCount()) {
+ serviceConfig.SetDefaultFractionOfThreadsCount(0.33);
+ }
if (serviceConfig.IsEnabled()) {
TIntrusivePtr<::NMonitoring::TDynamicCounters> tabletGroup = GetServiceCounters(appData->Counters, "tablets");
@@ -2165,6 +2168,31 @@ void TScanConveyorInitializer::InitializeServices(NActors::TActorSystemSetup* se
}
}
+TInsertConveyorInitializer::TInsertConveyorInitializer(const TKikimrRunConfig& runConfig)
+ : IKikimrServicesInitializer(runConfig) {
+}
+
+void TInsertConveyorInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) {
+ NConveyor::TConfig serviceConfig;
+ if (Config.HasInsertConveyorConfig()) {
+ Y_VERIFY(serviceConfig.DeserializeFromProto(Config.GetInsertConveyorConfig()));
+ }
+ if (!serviceConfig.HasDefaultFractionOfThreadsCount()) {
+ serviceConfig.SetDefaultFractionOfThreadsCount(0.2);
+ }
+
+ if (serviceConfig.IsEnabled()) {
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> tabletGroup = GetServiceCounters(appData->Counters, "tablets");
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorGroup = tabletGroup->GetSubgroup("type", "TX_INSERT_CONVEYOR");
+
+ auto service = NConveyor::TInsertServiceOperator::CreateService(serviceConfig, conveyorGroup);
+
+ setup->LocalServices.push_back(std::make_pair(
+ NConveyor::TInsertServiceOperator::MakeServiceId(NodeId),
+ TActorSetupCmd(service, TMailboxType::HTSwap, appData->UserPoolId)));
+ }
+}
+
TExternalIndexInitializer::TExternalIndexInitializer(const TKikimrRunConfig& runConfig)
: IKikimrServicesInitializer(runConfig) {
}
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.h b/ydb/core/driver_lib/run/kikimr_services_initializers.h
index f2048a0b2c..fba690bfbb 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.h
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.h
@@ -401,6 +401,12 @@ public:
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
};
+class TInsertConveyorInitializer: public IKikimrServicesInitializer {
+public:
+ TInsertConveyorInitializer(const TKikimrRunConfig& runConfig);
+ void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
+};
+
class TExternalIndexInitializer: public IKikimrServicesInitializer {
public:
TExternalIndexInitializer(const TKikimrRunConfig& runConfig);
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp
index cb4c6de3db..a6fd20052d 100644
--- a/ydb/core/driver_lib/run/run.cpp
+++ b/ydb/core/driver_lib/run/run.cpp
@@ -1506,6 +1506,10 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers
sil->AddServiceInitializer(new TCompConveyorInitializer(runConfig));
}
+ if (serviceMask.EnableInsertConveyor) {
+ sil->AddServiceInitializer(new TInsertConveyorInitializer(runConfig));
+ }
+
if (serviceMask.EnableBackgroundTasks) {
sil->AddServiceInitializer(new TBackgroundTasksInitializer(runConfig));
}
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 8f932ea610..ad2d137bfb 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -2050,6 +2050,7 @@ message TAppConfig {
optional TKafkaProxyConfig KafkaProxyConfig = 71;
optional TConveyorConfig CompConveyorConfig = 72;
optional TQueryServiceConfig QueryServiceConfig = 73;
+ optional TConveyorConfig InsertConveyorConfig = 74;
repeated TNamedConfig NamedConfigs = 100;
optional string ClusterYamlConfig = 101;
diff --git a/ydb/core/protos/console_config.proto b/ydb/core/protos/console_config.proto
index 3b37c9763c..76c52ce8dc 100644
--- a/ydb/core/protos/console_config.proto
+++ b/ydb/core/protos/console_config.proto
@@ -126,6 +126,8 @@ message TConfigItem {
AwsCompatibilityConfigItem = 70;
KafkaProxyConfig = 71;
CompConveyorConfigItem = 72;
+ QueryServiceConfigItem = 73;
+ InsertConveyorConfigItem = 74;
NamedConfigsItem = 100;
ClusterYamlConfigItem = 101;
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index d250e8277e..00f60235e2 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -766,6 +766,11 @@ namespace Tests {
const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0);
Runtime->RegisterService(NConveyor::TCompServiceOperator::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx);
}
+ {
+ auto* actor = NConveyor::TInsertServiceOperator::CreateService(NConveyor::TConfig(), new ::NMonitoring::TDynamicCounters());
+ const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0);
+ Runtime->RegisterService(NConveyor::TInsertServiceOperator::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx);
+ }
Runtime->Register(CreateLabelsMaintainer({}), nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0);
auto sysViewService = NSysView::CreateSysViewServiceForTests();
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index 5f5068768c..017df46d15 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -74,16 +74,20 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
if (writeMeta.HasLongTxId()) {
auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, errCode);
ctx.Send(writeMeta.GetSource(), result.release());
- CSCounters.OnFailedWriteResponse();
} else {
auto operation = OperationsManager.GetOperation((TWriteId)writeMeta.GetWriteId());
Y_VERIFY(operation);
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(operation->GetTxId(), NKikimrDataEvents::TEvWriteResult::ERROR, "put data fails");
ctx.Send(writeMeta.GetSource(), result.release());
- CSCounters.OnFailedWriteResponse();
}
+ CSCounters.OnFailedWriteResponse();
} else {
CSCounters.OnWritePutBlobsSuccess((TMonotonic::Now() - writeMeta.GetWriteStartInstant()).MilliSeconds());
+ CSCounters.OnWriteMiddle1PutBlobsSuccess((TMonotonic::Now() - writeMeta.GetWriteMiddle1StartInstant()).MilliSeconds());
+ CSCounters.OnWriteMiddle2PutBlobsSuccess((TMonotonic::Now() - writeMeta.GetWriteMiddle2StartInstant()).MilliSeconds());
+ CSCounters.OnWriteMiddle3PutBlobsSuccess((TMonotonic::Now() - writeMeta.GetWriteMiddle3StartInstant()).MilliSeconds());
+ CSCounters.OnWriteMiddle4PutBlobsSuccess((TMonotonic::Now() - writeMeta.GetWriteMiddle4StartInstant()).MilliSeconds());
+ CSCounters.OnWriteMiddle5PutBlobsSuccess((TMonotonic::Now() - writeMeta.GetWriteMiddle5StartInstant()).MilliSeconds());
LOG_S_DEBUG("Write (record) into pathId " << writeMeta.GetTableId()
<< (writeMeta.GetWriteId() ? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : "") << " at tablet " << TabletID());
@@ -161,10 +165,10 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
<< (writeMeta.GetWriteId()? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : " ")
<< WritesMonitor.DebugString()
<< " at tablet " << TabletID());
-
+ writeData.MutableWriteMeta().SetWriteMiddle1StartInstant(TMonotonic::Now());
std::shared_ptr<NConveyor::ITask> task = std::make_shared<NOlap::TBuildSlicesTask>(TabletID(), SelfId(),
StoragesManager->GetInsertOperator()->StartWritingAction(), writeData);
- NConveyor::TCompServiceOperator::SendTaskToExecute(task);
+ NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task);
}
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 2e1d6622e8..6d6612e97d 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -51,7 +51,7 @@ class TOperationsManager;
extern bool gAllowLogBatchingDefaultValue;
-IActor* CreateWriteActor(ui64 tabletId, IWriteController::TPtr writeController, const TInstant& deadline);
+IActor* CreateWriteActor(ui64 tabletId, IWriteController::TPtr writeController, const TInstant deadline);
IActor* CreateReadActor(ui64 tabletId, const NActors::TActorId readBlobsActor,
const TActorId& dstActor, const std::shared_ptr<NOlap::IStoragesManager>& storages,
std::unique_ptr<TEvColumnShard::TEvReadResult>&& event,
@@ -60,7 +60,6 @@ IActor* CreateReadActor(ui64 tabletId, const NActors::TActorId readBlobsActor,
const TActorId& columnShardActorId,
ui64 requestCookie, const TConcreteScanCounters& counters);
IActor* CreateColumnShardScan(const TActorId& scanComputeActor, ui32 scanId, ui64 txId);
-IActor* CreateExportActor(const ui64 tabletId, const TActorId& dstActor, TAutoPtr<TEvPrivate::TEvExport> ev);
struct TSettings {
static constexpr ui32 MAX_ACTIVE_COMPACTIONS = 1;
diff --git a/ydb/core/tx/columnshard/counters/columnshard.cpp b/ydb/core/tx/columnshard/counters/columnshard.cpp
index ca92df0aa8..989deae6b9 100644
--- a/ydb/core/tx/columnshard/counters/columnshard.cpp
+++ b/ydb/core/tx/columnshard/counters/columnshard.cpp
@@ -34,6 +34,11 @@ TCSCounters::TCSCounters()
SplitCompactionGranulePortionsCount = TBase::GetValueAutoAggregationsClient("SplitCompaction/PortionsCount");
HistogramSuccessWritePutBlobsDurationMs = TBase::GetHistogram("SuccessWritePutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5));
+ HistogramSuccessWriteMiddle1PutBlobsDurationMs = TBase::GetHistogram("SuccessWriteMiddle1PutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5));
+ HistogramSuccessWriteMiddle2PutBlobsDurationMs = TBase::GetHistogram("SuccessWriteMiddle2PutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5));
+ HistogramSuccessWriteMiddle3PutBlobsDurationMs = TBase::GetHistogram("SuccessWriteMiddle3PutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5));
+ HistogramSuccessWriteMiddle4PutBlobsDurationMs = TBase::GetHistogram("SuccessWriteMiddle4PutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5));
+ HistogramSuccessWriteMiddle5PutBlobsDurationMs = TBase::GetHistogram("SuccessWriteMiddle5PutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5));
HistogramFailedWritePutBlobsDurationMs = TBase::GetHistogram("FailedWritePutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5));
HistogramWriteTxCompleteDurationMs = TBase::GetHistogram("WriteTxCompleteDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5));
WritePutBlobsCount = TBase::GetValue("WritePutBlobs");
diff --git a/ydb/core/tx/columnshard/counters/columnshard.h b/ydb/core/tx/columnshard/counters/columnshard.h
index 3f602a830f..ffe935c995 100644
--- a/ydb/core/tx/columnshard/counters/columnshard.h
+++ b/ydb/core/tx/columnshard/counters/columnshard.h
@@ -34,6 +34,11 @@ private:
std::shared_ptr<TValueAggregationClient> SplitCompactionGranulePortionsCount;
NMonitoring::THistogramPtr HistogramSuccessWritePutBlobsDurationMs;
+ NMonitoring::THistogramPtr HistogramSuccessWriteMiddle1PutBlobsDurationMs;
+ NMonitoring::THistogramPtr HistogramSuccessWriteMiddle2PutBlobsDurationMs;
+ NMonitoring::THistogramPtr HistogramSuccessWriteMiddle3PutBlobsDurationMs;
+ NMonitoring::THistogramPtr HistogramSuccessWriteMiddle4PutBlobsDurationMs;
+ NMonitoring::THistogramPtr HistogramSuccessWriteMiddle5PutBlobsDurationMs;
NMonitoring::THistogramPtr HistogramFailedWritePutBlobsDurationMs;
NMonitoring::THistogramPtr HistogramWriteTxCompleteDurationMs;
NMonitoring::TDynamicCounters::TCounterPtr WritePutBlobsCount;
@@ -60,6 +65,26 @@ public:
WritePutBlobsCount->Sub(1);
}
+ void OnWriteMiddle1PutBlobsSuccess(const ui32 milliseconds) const {
+ HistogramSuccessWriteMiddle1PutBlobsDurationMs->Collect(milliseconds);
+ }
+
+ void OnWriteMiddle2PutBlobsSuccess(const ui32 milliseconds) const {
+ HistogramSuccessWriteMiddle2PutBlobsDurationMs->Collect(milliseconds);
+ }
+
+ void OnWriteMiddle3PutBlobsSuccess(const ui32 milliseconds) const {
+ HistogramSuccessWriteMiddle3PutBlobsDurationMs->Collect(milliseconds);
+ }
+
+ void OnWriteMiddle4PutBlobsSuccess(const ui32 milliseconds) const {
+ HistogramSuccessWriteMiddle4PutBlobsDurationMs->Collect(milliseconds);
+ }
+
+ void OnWriteMiddle5PutBlobsSuccess(const ui32 milliseconds) const {
+ HistogramSuccessWriteMiddle5PutBlobsDurationMs->Collect(milliseconds);
+ }
+
void OnWritePutBlobsFail(const ui32 milliseconds) const {
HistogramFailedWritePutBlobsDurationMs->Collect(milliseconds);
WritePutBlobsCount->Sub(1);
diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
index e4b8b122ae..ca7cbfe4a3 100644
--- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
+++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
@@ -20,6 +20,7 @@ TIndexedWriteController::TIndexedWriteController(const TActorId& dstActor, const
}
void TIndexedWriteController::DoOnReadyResult(const NActors::TActorContext& ctx, const NColumnShard::TBlobPutResult::TPtr& putResult) {
+ WriteData.MutableWriteMeta().SetWriteMiddle4StartInstant(TMonotonic::Now());
if (putResult->GetPutStatus() == NKikimrProto::OK) {
std::vector<std::shared_ptr<IBlobsWritingAction>> actions = {Action};
auto result = std::make_unique<NColumnShard::TEvPrivate::TEvWriteBlobsResult>(putResult, std::move(BlobData), actions, WriteData.GetWriteMeta(), WriteData.GetData().GetSchemaVersion());
@@ -30,4 +31,8 @@ void TIndexedWriteController::DoOnReadyResult(const NActors::TActorContext& ctx,
}
}
+void TIndexedWriteController::DoOnStartSending() {
+ WriteData.MutableWriteMeta().SetWriteMiddle5StartInstant(TMonotonic::Now());
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h
index c9bac60f4b..5261ee7eef 100644
--- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h
+++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h
@@ -5,6 +5,7 @@
#include <ydb/core/tx/ev_write/write_data.h>
#include <ydb/core/tx/columnshard/blobs_action/abstract/write.h>
+#include <ydb/core/tx/columnshard/counters/common/object_counter.h>
#include <ydb/core/tx/columnshard/engines/portion_info.h>
#include <ydb/core/tx/columnshard/columnshard.h>
#include <ydb/core/tx/columnshard/columnshard_private_events.h>
@@ -13,7 +14,7 @@
namespace NKikimr::NOlap {
-class TIndexedWriteController : public NColumnShard::IWriteController {
+class TIndexedWriteController : public NColumnShard::IWriteController, public NColumnShard::TMonitoringObjectsCounter<TIndexedWriteController, true> {
private:
std::vector<NArrow::TSerializedBatch> BlobsSplitted;
NEvWrite::TWriteData WriteData;
@@ -21,6 +22,8 @@ private:
TActorId DstActor;
std::shared_ptr<IBlobsWritingAction> Action;
void DoOnReadyResult(const NActors::TActorContext& ctx, const NColumnShard::TBlobPutResult::TPtr& putResult) override;
+ virtual void DoOnStartSending() override;
+
public:
TIndexedWriteController(const TActorId& dstActor, const NEvWrite::TWriteData& writeData, const std::shared_ptr<IBlobsWritingAction>& action, std::vector<NArrow::TSerializedBatch>&& blobsSplitted);
diff --git a/ydb/core/tx/columnshard/engines/writer/write_controller.h b/ydb/core/tx/columnshard/engines/writer/write_controller.h
index 38c7dda0f3..2056f98174 100644
--- a/ydb/core/tx/columnshard/engines/writer/write_controller.h
+++ b/ydb/core/tx/columnshard/engines/writer/write_controller.h
@@ -51,6 +51,9 @@ protected:
virtual void DoOnBlobWriteResult(const TEvBlobStorage::TEvPutResult& /*result*/) {
}
+ virtual void DoOnStartSending() {
+
+ }
NOlap::TBlobWriteInfo& AddWriteTask(NOlap::TBlobWriteInfo&& task) {
WritingActions.emplace(task.GetWriteOperator()->GetActionId(), task.GetWriteOperator());
@@ -67,6 +70,10 @@ public:
using TPtr = std::shared_ptr<IWriteController>;
virtual ~IWriteController() {}
+ void OnStartSending() {
+ DoOnStartSending();
+ }
+
void OnReadyResult(const NActors::TActorContext& ctx, const TBlobPutResult::TPtr& putResult) {
putResult->AddResources(ResourceUsage);
DoOnReadyResult(ctx, putResult);
diff --git a/ydb/core/tx/columnshard/write_actor.cpp b/ydb/core/tx/columnshard/write_actor.cpp
index 379028413c..6879d1bb59 100644
--- a/ydb/core/tx/columnshard/write_actor.cpp
+++ b/ydb/core/tx/columnshard/write_actor.cpp
@@ -19,7 +19,7 @@ class TWriteActor : public TActorBootstrapped<TWriteActor>, public TMonitoringOb
TInstant Deadline;
public:
- TWriteActor(ui64 tabletId, IWriteController::TPtr writeController, const TInstant& deadline)
+ TWriteActor(ui64 tabletId, IWriteController::TPtr writeController, const TInstant deadline)
: TabletId(tabletId)
, WriteController(writeController)
, Deadline(deadline)
@@ -74,6 +74,7 @@ public:
}
void Bootstrap(const TActorContext& ctx) {
+ WriteController->OnStartSending();
if (Deadline != TInstant::Max()) {
TInstant now = TAppData::TimeProvider->Now();
if (Deadline <= now) {
@@ -107,7 +108,7 @@ public:
} // namespace
-IActor* CreateWriteActor(ui64 tabletId, IWriteController::TPtr writeController, const TInstant& deadline) {
+IActor* CreateWriteActor(ui64 tabletId, IWriteController::TPtr writeController, const TInstant deadline) {
return new TWriteActor(tabletId, writeController, deadline);
}
diff --git a/ydb/core/tx/conveyor/usage/service.h b/ydb/core/tx/conveyor/usage/service.h
index 6cbe5ef3e0..50e4fc8b9d 100644
--- a/ydb/core/tx/conveyor/usage/service.h
+++ b/ydb/core/tx/conveyor/usage/service.h
@@ -7,6 +7,22 @@
namespace NKikimr::NConveyor {
+class TAsyncTaskExecutor: public TActorBootstrapped<TAsyncTaskExecutor> {
+private:
+ const std::shared_ptr<ITask> Task;
+public:
+ TAsyncTaskExecutor(const std::shared_ptr<ITask>& task)
+ : Task(task)
+ {
+
+ }
+
+ void Bootstrap() {
+ auto gAway = PassAwayGuard();
+ Task->Execute(nullptr);
+ }
+};
+
template <class TConveyorPolicy>
class TServiceOperatorImpl {
private:
@@ -20,6 +36,10 @@ private:
return TConveyorPolicy::Name;
}
public:
+ static void AsyncTaskToExecute(const std::shared_ptr<ITask>& task) {
+ auto& context = NActors::TActorContext::AsActorContext();
+ context.Register(new TAsyncTaskExecutor(task));
+ }
static bool SendTaskToExecute(const std::shared_ptr<ITask>& task) {
auto& context = NActors::TActorContext::AsActorContext();
const NActors::TActorId& selfId = context.SelfID;
@@ -55,7 +75,13 @@ public:
static const inline TString Name = "Comp";
};
+class TInsertConveyorPolicy {
+public:
+ static const inline TString Name = "Isrt";
+};
+
using TScanServiceOperator = TServiceOperatorImpl<TScanConveyorPolicy>;
using TCompServiceOperator = TServiceOperatorImpl<TCompConveyorPolicy>;
+using TInsertServiceOperator = TServiceOperatorImpl<TInsertConveyorPolicy>;
}
diff --git a/ydb/core/tx/ev_write/write_data.h b/ydb/core/tx/ev_write/write_data.h
index 9d95b58b98..81d1d673b2 100644
--- a/ydb/core/tx/ev_write/write_data.h
+++ b/ydb/core/tx/ev_write/write_data.h
@@ -28,6 +28,11 @@ class TWriteMeta {
YDB_ACCESSOR_DEF(TString, DedupId);
YDB_READONLY(TMonotonic, WriteStartInstant, TMonotonic::Now());
+ YDB_ACCESSOR(TMonotonic, WriteMiddle1StartInstant, TMonotonic::Now());
+ YDB_ACCESSOR(TMonotonic, WriteMiddle2StartInstant, TMonotonic::Now());
+ YDB_ACCESSOR(TMonotonic, WriteMiddle3StartInstant, TMonotonic::Now());
+ YDB_ACCESSOR(TMonotonic, WriteMiddle4StartInstant, TMonotonic::Now());
+ YDB_ACCESSOR(TMonotonic, WriteMiddle5StartInstant, TMonotonic::Now());
public:
TWriteMeta(const ui64 writeId, const ui64 tableId, const NActors::TActorId& source)
: WriteId(writeId)
@@ -54,6 +59,10 @@ public:
return WriteMeta;
}
+ TWriteMeta& MutableWriteMeta() {
+ return WriteMeta;
+ }
+
ui64 GetSize() const {
return Data->GetData().size();
}