aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikita Vasilev <ns-vasilev@ydb.tech>2024-12-24 18:15:30 +0300
committerGitHub <noreply@github.com>2024-12-24 18:15:30 +0300
commit3096657831a636dcd1403e1d6f98a8dc284e6353 (patch)
tree5b2addc39e99f519b59c4be696c57bfc459aba31
parente432d9c27edb3c270328481e7bd3e8448da9ffc1 (diff)
downloadydb-3096657831a636dcd1403e1d6f98a8dc284e6353.tar.gz
Snapshot Isolation: kqp (#12825)
-rw-r--r--.github/config/muted_ya.txt2
-rw-r--r--ydb/core/data_integrity_trails/data_integrity_trails.h3
-rw-r--r--ydb/core/grpc_services/query/rpc_execute_query.cpp3
-rw-r--r--ydb/core/grpc_services/query/rpc_kqp_tx.cpp3
-rw-r--r--ydb/core/kqp/common/kqp_tx.cpp39
-rw-r--r--ydb/core/kqp/common/kqp_tx.h12
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_actor.cpp1
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_service.cpp5
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor.cpp8
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor.h9
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp5
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h1
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp4
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h4
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp9
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h5
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp18
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h23
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner.cpp5
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner.h2
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp3
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.h5
-rw-r--r--ydb/core/kqp/node_service/kqp_node_service.cpp7
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_settings.h1
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp3
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp5
-rw-r--r--ydb/core/kqp/runtime/kqp_write_actor.cpp22
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_state.h11
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp24
-rw-r--r--ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp1
-rw-r--r--ydb/core/kqp/ut/tx/kqp_sink_common.h1
-rw-r--r--ydb/core/kqp/ut/tx/kqp_snapshot_isolation_ut.cpp232
-rw-r--r--ydb/core/kqp/ut/tx/ya.make1
-rw-r--r--ydb/core/protos/data_events.proto7
-rw-r--r--ydb/core/protos/kqp.proto6
-rw-r--r--ydb/core/protos/table_service_config.proto2
-rw-r--r--ydb/core/protos/tx_datashard.proto6
-rw-r--r--ydb/public/api/protos/ydb_query.proto3
-rw-r--r--ydb/public/api/protos/ydb_table.proto4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/client.cpp3
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp3
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/tx.h10
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp3
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/table.h10
44 files changed, 485 insertions, 49 deletions
diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt
index 3bffb2eb19..78515c88fc 100644
--- a/.github/config/muted_ya.txt
+++ b/.github/config/muted_ya.txt
@@ -49,6 +49,8 @@ ydb/core/kqp/ut/olap [*/*] chunk chunk
ydb/core/kqp/ut/query KqpAnalyze.AnalyzeTable+ColumnStore
ydb/core/kqp/ut/query KqpLimits.QueryExecTimeoutCancel
ydb/core/kqp/ut/query KqpStats.SysViewClientLost
+ydb/core/kqp/ut/tx KqpSnapshotIsolation.TConflictWrite*
+ydb/core/kqp/ut/tx KqpSnapshotIsolation.TConflictRead*
ydb/core/kqp/ut/scan KqpRequestContext.TraceIdInErrorMessage
ydb/core/kqp/ut/scheme KqpOlapScheme.TenThousandColumns
ydb/core/kqp/ut/scheme KqpScheme.AlterAsyncReplication
diff --git a/ydb/core/data_integrity_trails/data_integrity_trails.h b/ydb/core/data_integrity_trails/data_integrity_trails.h
index 003505d5dc..1ec566048a 100644
--- a/ydb/core/data_integrity_trails/data_integrity_trails.h
+++ b/ydb/core/data_integrity_trails/data_integrity_trails.h
@@ -27,6 +27,9 @@ inline void LogTxSettings(const TransactionSettings& txSettings, TStringStream&
case TransactionSettings::kSnapshotReadOnly:
LogKeyValue("TxMode", "SnapshotReadOnly", ss);
break;
+ case TransactionSettings::kSnapshotReadWrite:
+ LogKeyValue("TxMode", "SnapshotReadWrite", ss);
+ break;
case TransactionSettings::TX_MODE_NOT_SET:
LogKeyValue("TxMode", "Undefined", ss);
break;
diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp
index 1730d30a78..6e2164b8b2 100644
--- a/ydb/core/grpc_services/query/rpc_execute_query.cpp
+++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp
@@ -64,6 +64,9 @@ bool FillTxSettings(const Ydb::Query::TransactionSettings& from, Ydb::Table::Tra
case Ydb::Query::TransactionSettings::kSnapshotReadOnly:
to.mutable_snapshot_read_only();
break;
+ case Ydb::Query::TransactionSettings::kSnapshotReadWrite:
+ to.mutable_snapshot_read_write();
+ break;
default:
issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR,
"Invalid tx_settings"));
diff --git a/ydb/core/grpc_services/query/rpc_kqp_tx.cpp b/ydb/core/grpc_services/query/rpc_kqp_tx.cpp
index b9d6e9385a..888380c22a 100644
--- a/ydb/core/grpc_services/query/rpc_kqp_tx.cpp
+++ b/ydb/core/grpc_services/query/rpc_kqp_tx.cpp
@@ -104,6 +104,9 @@ private:
case Ydb::Query::TransactionSettings::kSnapshotReadOnly:
ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_snapshot_read_only();
break;
+ case Ydb::Query::TransactionSettings::kSnapshotReadWrite:
+ ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_snapshot_read_write();
+ break;
}
ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_BEGIN_TX);
diff --git a/ydb/core/kqp/common/kqp_tx.cpp b/ydb/core/kqp/common/kqp_tx.cpp
index 4d9f7ab905..e9b02b093c 100644
--- a/ydb/core/kqp/common/kqp_tx.cpp
+++ b/ydb/core/kqp/common/kqp_tx.cpp
@@ -158,7 +158,8 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
Y_UNUSED(config);
if (*txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE &&
- *txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RO)
+ *txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RO &&
+ *txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW)
return false;
if (txCtx.GetSnapshot().IsValid())
@@ -211,26 +212,42 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
YQL_ENSURE(!hasSinkWrite || hasEffects);
- // We don't want snapshot when there are effects at the moment,
- // because it hurts performance when there are multiple single-shard
- // reads and a single distributed commit. Taking snapshot costs
- // similar to an additional distributed transaction, and it's very
- // hard to predict when that happens, causing performance
- // degradation.
- if (hasEffects) {
- return false;
- }
-
// We need snapshot for stream lookup, besause it's used for dependent reads
if (hasStreamLookup) {
return true;
}
+ if (*txCtx.EffectiveIsolationLevel == NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW) {
+ if (hasEffects && !txCtx.HasTableRead) {
+ YQL_ENSURE(txCtx.HasTableWrite);
+ // Don't need snapshot for WriteOnly transaction.
+ return false;
+ } else if (hasEffects) {
+ YQL_ENSURE(txCtx.HasTableWrite);
+ // ReadWrite transaction => need snapshot
+ return true;
+ }
+ // ReadOnly transaction here
+ } else {
+ // We don't want snapshot when there are effects at the moment,
+ // because it hurts performance when there are multiple single-shard
+ // reads and a single distributed commit. Taking snapshot costs
+ // similar to an additional distributed transaction, and it's very
+ // hard to predict when that happens, causing performance
+ // degradation.
+ if (hasEffects) {
+ return false;
+ }
+ }
+
+ YQL_ENSURE(!hasEffects && !hasStreamLookup);
+
// We need snapshot when there are multiple table read phases, most
// likely it involves multiple tables and we would have to use a
// distributed commit otherwise. Taking snapshot helps as avoid TLI
// for read-only transactions, and costs less than a final distributed
// commit.
+ // NOTE: In case of read from single shard, we won't take snapshot.
return readPhases > 1;
}
diff --git a/ydb/core/kqp/common/kqp_tx.h b/ydb/core/kqp/common/kqp_tx.h
index 8d324e0ae0..d7353de054 100644
--- a/ydb/core/kqp/common/kqp_tx.h
+++ b/ydb/core/kqp/common/kqp_tx.h
@@ -235,6 +235,7 @@ public:
HasOlapTable = false;
HasOltpTable = false;
HasTableWrite = false;
+ HasTableRead = false;
NeedUncommittedChangesFlush = false;
}
@@ -264,16 +265,24 @@ public:
Readonly = true;
break;
+ case Ydb::Table::TransactionSettings::kSnapshotReadWrite:
+ EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW;
+ Readonly = false;
+ break;
+
case Ydb::Table::TransactionSettings::TX_MODE_NOT_SET:
YQL_ENSURE(false, "tx_mode not set, settings: " << settings);
break;
};
}
- bool ShouldExecuteDeferredEffects() const {
+ bool ShouldExecuteDeferredEffects(const TKqpPhyTxHolder::TConstPtr& tx) const {
if (NeedUncommittedChangesFlush || HasOlapTable) {
return !DeferredEffects.Empty();
}
+ if (EffectiveIsolationLevel == NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW && !tx && HasTableRead) {
+ return !DeferredEffects.Empty();
+ }
return false;
}
@@ -343,6 +352,7 @@ public:
bool HasOlapTable = false;
bool HasOltpTable = false;
bool HasTableWrite = false;
+ bool HasTableRead = false;
bool NeedUncommittedChangesFlush = false;
THashSet<NKikimr::TTableId> ModifiedTablesSinceLastFlush;
diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
index d072d9e642..2e680ae374 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
@@ -652,6 +652,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
kqpConfig.DefaultCostBasedOptimizationLevel = serviceConfig.GetDefaultCostBasedOptimizationLevel();
kqpConfig.EnableConstantFolding = serviceConfig.GetEnableConstantFolding();
kqpConfig.SetDefaultEnabledSpillingNodes(serviceConfig.GetEnableSpillingNodes());
+ kqpConfig.EnableSnapshotIsolationRW = serviceConfig.GetEnableSnapshotIsolationRW();
if (const auto limit = serviceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit()) {
kqpConfig._KqpYqlCombinerMemoryLimit = std::max(1_GB, limit - (limit >> 2U));
diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp
index 115222e463..b777eb5133 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp
@@ -315,6 +315,8 @@ private:
TString enableSpillingNodes = TableServiceConfig.GetEnableSpillingNodes();
+ bool enableSnapshotIsolationRW = TableServiceConfig.GetEnableSnapshotIsolationRW();
+
TableServiceConfig.Swap(event.MutableConfig()->MutableTableServiceConfig());
LOG_INFO(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE, "Updated config");
@@ -346,7 +348,8 @@ private:
TableServiceConfig.GetEnableAstCache() != enableAstCache ||
TableServiceConfig.GetEnableImplicitQueryParameterTypes() != enableImplicitQueryParameterTypes ||
TableServiceConfig.GetEnablePgConstsToParams() != enablePgConstsToParams ||
- TableServiceConfig.GetEnablePerStatementQueryExecution() != enablePerStatementQueryExecution) {
+ TableServiceConfig.GetEnablePerStatementQueryExecution() != enablePerStatementQueryExecution ||
+ TableServiceConfig.GetEnableSnapshotIsolationRW() != enableSnapshotIsolationRW) {
QueryCache->Clear();
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
index 8e5295d15d..0b3612aa79 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
@@ -131,20 +131,20 @@ namespace NKikimr::NKqp {
using namespace NYql::NDq;
using namespace NYql::NDqProto;
-IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId,
+IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId,
TDqTask* task, IDqAsyncIoFactory::TPtr asyncIoFactory,
const NYql::NDq::TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
TIntrusivePtr<NActors::TProtoArenaHolder> arena, TComputeActorSchedulingOptions schedulingOptions,
NKikimrConfig::TTableServiceConfig::EBlockTrackingMode mode)
{
- return new NScanPrivate::TKqpScanComputeActor(std::move(schedulingOptions), executerId, txId, lockTxId, lockNodeId, task, std::move(asyncIoFactory),
+ return new NScanPrivate::TKqpScanComputeActor(std::move(schedulingOptions), executerId, txId, task, std::move(asyncIoFactory),
settings, memoryLimits, std::move(traceId), std::move(arena), mode);
}
IActor* CreateKqpScanFetcher(const NKikimrKqp::TKqpSnapshot& snapshot, std::vector<NActors::TActorId>&& computeActors,
const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings,
- const ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId) {
- return new NScanPrivate::TKqpScanFetcherActor(snapshot, settings, std::move(computeActors), txId, lockTxId, lockNodeId, meta, shardsScanningPolicy, counters, std::move(traceId));
+ const ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId, TMaybe<NKikimrDataEvents::ELockMode> lockMode, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId) {
+ return new NScanPrivate::TKqpScanFetcherActor(snapshot, settings, std::move(computeActors), txId, lockTxId, lockNodeId, lockMode, meta, shardsScanningPolicy, counters, std::move(traceId));
}
}
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
index 265332afbb..670481a20c 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
@@ -52,14 +52,15 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqPr
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
TComputeActorSchedulingOptions, NKikimrConfig::TTableServiceConfig::EBlockTrackingMode);
-IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId,
- NYql::NDqProto::TDqTask* task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
- const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
+IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask* task,
+ NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NYql::NDq::TComputeRuntimeSettings& settings,
+ const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
TIntrusivePtr<NActors::TProtoArenaHolder> arena, TComputeActorSchedulingOptions, NKikimrConfig::TTableServiceConfig::EBlockTrackingMode mode);
IActor* CreateKqpScanFetcher(const NKikimrKqp::TKqpSnapshot& snapshot, std::vector<NActors::TActorId>&& computeActors,
const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings,
- const ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId);
+ const ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId, TMaybe<NKikimrDataEvents::ELockMode> lockMode,
+ const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId);
NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(
TIntrusivePtr<TKqpCounters> counters,
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp
index 0df7f95021..59b2ef6fe1 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp
@@ -212,8 +212,9 @@ public:
if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) {
YQL_ENSURE(args.ComputesByStages);
auto& info = args.ComputesByStages->UpsertTaskWithScan(*args.Task, meta, !AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead());
- IActor* computeActor = CreateKqpScanComputeActor(args.ExecuterId, args.TxId, args.LockTxId, args.LockNodeId, args.Task,
- AsyncIoFactory, runtimeSettings, memoryLimits,
+ IActor* computeActor = CreateKqpScanComputeActor(
+ args.ExecuterId, args.TxId,
+ args.Task, AsyncIoFactory, runtimeSettings, memoryLimits,
std::move(args.TraceId), std::move(args.Arena),
std::move(args.SchedulingOptions), args.BlockTrackingMode);
TActorId result = TlsActivationContext->Register(computeActor);
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h
index 7dbf222bd5..2dcbf7cb1b 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h
@@ -110,6 +110,7 @@ public:
const ui64 TxId;
const TMaybe<ui64> LockTxId;
const ui32 LockNodeId;
+ const TMaybe<NKikimrDataEvents::ELockMode> LockMode;
NYql::NDqProto::TDqTask* Task;
TIntrusivePtr<NRm::TTxState> TxInfo;
const NYql::NDq::TComputeRuntimeSettings& RuntimeSettings;
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index 7e6c8c136c..86a6dd676c 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -23,15 +23,13 @@ static constexpr TDuration RL_MAX_BATCH_DELAY = TDuration::Seconds(50);
} // anonymous namespace
-TKqpScanComputeActor::TKqpScanComputeActor(TComputeActorSchedulingOptions cpuOptions, const TActorId& executerId, ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId,
+TKqpScanComputeActor::TKqpScanComputeActor(TComputeActorSchedulingOptions cpuOptions, const TActorId& executerId, ui64 txId,
NDqProto::TDqTask* task, IDqAsyncIoFactory::TPtr asyncIoFactory,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
TIntrusivePtr<NActors::TProtoArenaHolder> arena, EBlockTrackingMode mode)
: TBase(std::move(cpuOptions), executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings,
memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena))
, ComputeCtx(settings.StatsMode)
- , LockTxId(lockTxId)
- , LockNodeId(lockNodeId)
, BlockTrackingMode(mode)
{
InitializeTask();
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h
index ae13ce8a40..fd9889f956 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h
@@ -25,8 +25,6 @@ private:
std::set<NActors::TActorId> Fetchers;
NMiniKQL::TKqpScanComputeContext::TScanData* ScanData = nullptr;
- const TMaybe<ui64> LockTxId;
- const ui32 LockNodeId;
struct TLockHash {
bool operator()(const NKikimrDataEvents::TLock& lock) {
@@ -70,7 +68,7 @@ public:
return NKikimrServices::TActivity::KQP_SCAN_COMPUTE_ACTOR;
}
- TKqpScanComputeActor(TComputeActorSchedulingOptions, const TActorId& executerId, ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId,
+ TKqpScanComputeActor(TComputeActorSchedulingOptions, const TActorId& executerId, ui64 txId,
NYql::NDqProto::TDqTask* task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
TIntrusivePtr<NActors::TProtoArenaHolder> arena, EBlockTrackingMode mode);
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
index 087f4979b4..8faead8aa3 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
@@ -23,7 +23,8 @@ static constexpr ui64 MAX_SHARD_RESOLVES = 3;
TKqpScanFetcherActor::TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snapshot,
- const TComputeRuntimeSettings& settings, std::vector<NActors::TActorId>&& computeActors, const ui64 txId, const TMaybe<ui64> lockTxId, const ui32 lockNodeId,
+ const TComputeRuntimeSettings& settings, std::vector<NActors::TActorId>&& computeActors,
+ const ui64 txId, const TMaybe<ui64> lockTxId, const ui32 lockNodeId, const TMaybe<NKikimrDataEvents::ELockMode> lockMode,
const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta, const TShardsScanningPolicy& shardsScanningPolicy,
TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId)
: Meta(meta)
@@ -32,6 +33,7 @@ TKqpScanFetcherActor::TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snaps
, TxId(txId)
, LockTxId(lockTxId)
, LockNodeId(lockNodeId)
+ , LockMode(lockMode)
, ComputeActorIds(std::move(computeActors))
, Snapshot(snapshot)
, ShardsScanningPolicy(shardsScanningPolicy)
@@ -449,6 +451,9 @@ std::unique_ptr<NKikimr::TEvDataShard::TEvKqpScan> TKqpScanFetcherActor::BuildEv
ev->Record.SetTxId(std::get<ui64>(TxId));
if (LockTxId) {
ev->Record.SetLockTxId(*LockTxId);
+ if (LockMode) {
+ ev->Record.SetLockMode(*LockMode);
+ }
}
ev->Record.SetLockNodeId(LockNodeId);
ev->Record.SetTablePath(ScanDataMeta.TablePath);
@@ -499,8 +504,6 @@ void TKqpScanFetcherActor::ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData
state->LastKey = std::move(msg.LastKey);
state->LastCursorProto = std::move(msg.LastCursorProto);
const ui64 rowsCount = msg.GetRowsCount();
- AFL_ENSURE(!LockTxId || !msg.LocksInfo.Locks.empty() || !msg.LocksInfo.BrokenLocks.empty());
- AFL_ENSURE(LockTxId || (msg.LocksInfo.Locks.empty() && msg.LocksInfo.BrokenLocks.empty()));
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("action", "got EvScanData")("rows", rowsCount)("finished", msg.Finished)("exceeded", msg.RequestedBytesLimitReached)
("scan", ScanId)("packs_to_send", InFlightComputes.GetPacksToSendCount())
("from", ev->Sender)("shards remain", PendingShards.size())
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h
index d513939b4e..73ead0a5ef 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h
+++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h
@@ -52,13 +52,16 @@ private:
const NYql::NDq::TTxId TxId;
const TMaybe<ui64> LockTxId;
const ui32 LockNodeId;
+ const TMaybe<NKikimrDataEvents::ELockMode> LockMode;
+
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::KQP_SCAN_FETCH_ACTOR;
}
TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snapshot, const NYql::NDq::TComputeRuntimeSettings& settings,
- std::vector<NActors::TActorId>&& computeActors, const ui64 txId, const TMaybe<ui64> lockTxId, const ui32 lockNodeId,
+ std::vector<NActors::TActorId>&& computeActors,
+ const ui64 txId, const TMaybe<ui64> lockTxId, const ui32 lockNodeId, const TMaybe<NKikimrDataEvents::ELockMode> lockMode,
const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta,
const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId);
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 9172c245f8..30854284d0 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -156,9 +156,13 @@ public:
YQL_ENSURE(Request.IsolationLevel != NKikimrKqp::ISOLATION_LEVEL_UNDEFINED);
if (Request.AcquireLocksTxId || Request.LocksOp == ELocksOp::Commit || Request.LocksOp == ELocksOp::Rollback) {
- YQL_ENSURE(Request.IsolationLevel == NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE);
+ YQL_ENSURE(Request.IsolationLevel == NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE
+ || Request.IsolationLevel == NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW);
}
+ YQL_ENSURE(Request.IsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW
+ || tableServiceConfig.GetEnableKqpDataQueryStreamLookup());
+
ReadOnlyTx = IsReadOnlyTx();
}
@@ -1740,11 +1744,14 @@ private:
dataTransaction.SetPerShardKeysSizeLimitBytes(Request.PerShardKeysSizeLimitBytes);
}
- auto& lockTxId = TasksGraph.GetMeta().LockTxId;
+ const auto& lockTxId = TasksGraph.GetMeta().LockTxId;
if (lockTxId) {
dataTransaction.SetLockTxId(*lockTxId);
dataTransaction.SetLockNodeId(SelfId().NodeId());
}
+ if (TasksGraph.GetMeta().LockMode && ImmediateTx) {
+ dataTransaction.SetLockMode(*TasksGraph.GetMeta().LockMode);
+ }
for (auto& task : dataTransaction.GetKqpTransaction().GetTasks()) {
shardState.TaskIds.insert(task.GetId());
@@ -1775,7 +1782,11 @@ private:
(ImmediateTx ? NTxDataShard::TTxFlags::Immediate : 0) |
(VolatileTx ? NTxDataShard::TTxFlags::VolatilePrepare : 0);
std::unique_ptr<TEvDataShard::TEvProposeTransaction> evData;
- if (GetSnapshot().IsValid() && (ReadOnlyTx || Request.UseImmediateEffects)) {
+ if (GetSnapshot().IsValid()
+ && (ReadOnlyTx
+ || Request.UseImmediateEffects
+ || (Request.LocksOp == ELocksOp::Unspecified
+ && TasksGraph.GetMeta().LockMode == NKikimrDataEvents::OPTIMISTIC_SNAPSHOT_ISOLATION))) {
evData.reset(new TEvDataShard::TEvProposeTransaction(
NKikimrTxDataShard::TX_KIND_DATA,
SelfId(),
@@ -2664,6 +2675,7 @@ private:
.TxId = TxId,
.LockTxId = lockTxId,
.LockNodeId = SelfId().NodeId(),
+ .LockMode = TasksGraph.GetMeta().LockMode,
.Executer = SelfId(),
.Snapshot = GetSnapshot(),
.Database = Database,
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 89e9135fcd..9ff4540502 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -505,6 +505,15 @@ protected:
TasksGraph.GetMeta().SetLockTxId(lockTxId);
TasksGraph.GetMeta().SetLockNodeId(SelfId().NodeId());
+ switch (Request.IsolationLevel) {
+ case NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW:
+ TasksGraph.GetMeta().SetLockMode(NKikimrDataEvents::OPTIMISTIC_SNAPSHOT_ISOLATION);
+ break;
+ default:
+ TasksGraph.GetMeta().SetLockMode(NKikimrDataEvents::OPTIMISTIC);
+ break;
+ }
+
LWTRACK(KqpBaseExecuterHandleReady, ResponseEv->Orbit, TxId);
if (IsDebugLogEnabled()) {
for (auto& tx : Request.Transactions) {
@@ -953,6 +962,16 @@ protected:
if (!settings.GetInconsistentTx() && !settings.GetIsOlap()) {
ActorIdToProto(BufferActorId, settings.MutableBufferActorId());
}
+ if (!settings.GetInconsistentTx()
+ && TasksGraph.GetMeta().LockMode == NKikimrDataEvents::OPTIMISTIC_SNAPSHOT_ISOLATION
+ && GetSnapshot().IsValid()) {
+ settings.MutableMvccSnapshot()->SetStep(GetSnapshot().Step);
+ settings.MutableMvccSnapshot()->SetTxId(GetSnapshot().TxId);
+ }
+ if (!settings.GetInconsistentTx() && TasksGraph.GetMeta().LockMode) {
+ settings.SetLockMode(*TasksGraph.GetMeta().LockMode);
+ }
+
output.SinkSettings.ConstructInPlace();
output.SinkSettings->PackFrom(settings);
} else {
@@ -1213,6 +1232,10 @@ protected:
settings->SetLockNodeId(self.NodeId());
}
+ if (TasksGraph.GetMeta().LockMode) {
+ settings->SetLockMode(*TasksGraph.GetMeta().LockMode);
+ }
+
createdTasksIds.push_back(task.Id);
return task;
};
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp
index 09d5076078..a126f30eea 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp
@@ -85,6 +85,7 @@ TKqpPlanner::TKqpPlanner(TKqpPlanner::TArgs&& args)
: TxId(args.TxId)
, LockTxId(args.LockTxId)
, LockNodeId(args.LockNodeId)
+ , LockMode(args.LockMode)
, ExecuterId(args.Executer)
, Snapshot(args.Snapshot)
, Database(args.Database)
@@ -207,6 +208,9 @@ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeReque
request.SetLockTxId(*LockTxId);
request.SetLockNodeId(LockNodeId);
}
+ if (LockMode) {
+ request.SetLockMode(*LockMode);
+ }
ActorIdToProto(ExecuterId, request.MutableExecuterActorId());
if (Deadline) {
@@ -487,6 +491,7 @@ TString TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize)
.TxId = TxId,
.LockTxId = LockTxId,
.LockNodeId = LockNodeId,
+ .LockMode = LockMode,
.Task = taskDesc,
.TxInfo = TxInfo,
.RuntimeSettings = settings,
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h
index a556b8198e..9297002b74 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.h
+++ b/ydb/core/kqp/executer_actor/kqp_planner.h
@@ -45,6 +45,7 @@ public:
const ui64 TxId;
const TMaybe<ui64> LockTxId;
const ui32 LockNodeId;
+ const TMaybe<NKikimrDataEvents::ELockMode> LockMode;
const TActorId& Executer;
const IKqpGateway::TKqpSnapshot& Snapshot;
const TString& Database;
@@ -107,6 +108,7 @@ private:
const ui64 TxId;
const TMaybe<ui64> LockTxId;
const ui32 LockNodeId;
+ const TMaybe<NKikimrDataEvents::ELockMode> LockMode;
const TActorId ExecuterId;
TVector<ui64> ComputeTasks;
THashMap<ui64, TVector<ui64>> TasksPerNode;
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
index 2e2ac0eda0..0b8b458a6e 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
@@ -1138,6 +1138,9 @@ void FillInputDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TTaskInput&
input.Meta.StreamLookupSettings->SetLockTxId(*lockTxId);
input.Meta.StreamLookupSettings->SetLockNodeId(tasksGraph.GetMeta().LockNodeId);
}
+ if (tasksGraph.GetMeta().LockMode) {
+ input.Meta.StreamLookupSettings->SetLockMode(*tasksGraph.GetMeta().LockMode);
+ }
transformProto->MutableSettings()->PackFrom(*input.Meta.StreamLookupSettings);
} else if (input.Meta.SequencerSettings) {
transformProto->MutableSettings()->PackFrom(*input.Meta.SequencerSettings);
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
index 6c0c1c729e..5ec0dba1f0 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
@@ -92,6 +92,7 @@ struct TGraphMeta {
IKqpGateway::TKqpSnapshot Snapshot;
TMaybe<ui64> LockTxId;
ui32 LockNodeId;
+ TMaybe<NKikimrDataEvents::ELockMode> LockMode;
std::unordered_map<ui64, TActorId> ResultChannelProxies;
TActorId ExecuterId;
bool UseFollowers = false;
@@ -122,6 +123,10 @@ struct TGraphMeta {
void SetLockNodeId(ui32 lockNodeId) {
LockNodeId = lockNodeId;
}
+
+ void SetLockMode(NKikimrDataEvents::ELockMode lockMode) {
+ LockMode = lockMode;
+ }
};
struct TTaskInputMeta {
diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp
index 0d6dc18e04..3010c0a94f 100644
--- a/ydb/core/kqp/node_service/kqp_node_service.cpp
+++ b/ydb/core/kqp/node_service/kqp_node_service.cpp
@@ -153,6 +153,9 @@ private:
? TMaybe<ui64>(msg.GetLockTxId())
: Nothing();
ui32 lockNodeId = msg.GetLockNodeId();
+ TMaybe<NKikimrDataEvents::ELockMode> lockMode = msg.HasLockMode()
+ ? TMaybe<NKikimrDataEvents::ELockMode>(msg.GetLockMode())
+ : Nothing();
YQL_ENSURE(msg.GetStartAllOrFail()); // todo: support partial start
@@ -259,6 +262,7 @@ private:
.TxId = txId,
.LockTxId = lockTxId,
.LockNodeId = lockNodeId,
+ .LockMode = lockMode,
.Task = &dqTask,
.TxInfo = txInfo,
.RuntimeSettings = runtimeSettingsBase,
@@ -305,7 +309,8 @@ private:
for (auto&& i : computesByStage) {
for (auto&& m : i.second.MutableMetaInfo()) {
Register(CreateKqpScanFetcher(msg.GetSnapshot(), std::move(m.MutableActorIds()),
- m.GetMeta(), runtimeSettingsBase, txId, lockTxId, lockNodeId, scanPolicy, Counters, NWilson::TTraceId(ev->TraceId)));
+ m.GetMeta(), runtimeSettingsBase, txId, lockTxId, lockNodeId, lockMode,
+ scanPolicy, Counters, NWilson::TTraceId(ev->TraceId)));
}
}
diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h
index 11a6f8d2b1..56d7d0ad20 100644
--- a/ydb/core/kqp/provider/yql_kikimr_settings.h
+++ b/ydb/core/kqp/provider/yql_kikimr_settings.h
@@ -178,6 +178,7 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi
bool EnableConstantFolding = true;
ui64 DefaultEnableSpillingNodes = 0;
bool EnableAntlr4Parser = false;
+ bool EnableSnapshotIsolationRW = false;
void SetDefaultEnabledSpillingNodes(const TString& node);
ui64 GetEnabledSpillingNodes() const;
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index 42d6075e64..fe16693416 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -836,6 +836,9 @@ public:
if (Settings->HasLockTxId() && BrokenLocks.empty()) {
record.SetLockTxId(Settings->GetLockTxId());
+ if (Settings->HasLockMode()) {
+ ev->Record.SetLockMode(Settings->GetLockMode());
+ }
}
if (Settings->HasLockNodeId()) {
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
index d43222283b..d52e826fd3 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -39,6 +39,7 @@ public:
, AllowInconsistentReads(settings.GetAllowInconsistentReads())
, LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>())
, NodeLockId(settings.HasLockNodeId() ? settings.GetLockNodeId() : TMaybe<ui32>())
+ , LockMode(settings.HasLockMode() ? settings.GetLockMode() : TMaybe<NKikimrDataEvents::ELockMode>())
, SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT)
, LookupStrategy(settings.GetLookupStrategy())
, StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), args.TypeEnv, args.HolderFactory, args.InputDesc))
@@ -498,6 +499,9 @@ private:
if (LockTxId && BrokenLocks.empty()) {
record.SetLockTxId(*LockTxId);
+ if (LockMode) {
+ record.SetLockMode(*LockMode);
+ }
}
if (NodeLockId) {
@@ -641,6 +645,7 @@ private:
const bool AllowInconsistentReads;
const TMaybe<ui64> LockTxId;
const TMaybe<ui32> NodeLockId;
+ const TMaybe<NKikimrDataEvents::ELockMode> LockMode;
std::unordered_map<ui64, TReadState> Reads;
std::unordered_map<ui64, TShardState> ReadsPerShard;
std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>> Partitioning;
diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp
index 8dd6dda3a2..a5244a8550 100644
--- a/ydb/core/kqp/runtime/kqp_write_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp
@@ -157,7 +157,6 @@ struct TKqpTableWriterStatistics {
THashSet<ui64> AffectedPartitions;
};
-
class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
using TBase = TActorBootstrapped<TKqpTableWriteActor>;
@@ -202,12 +201,16 @@ public:
TVector<NScheme::TTypeInfo> keyColumnTypes,
const NMiniKQL::TTypeEnvironment& typeEnv,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
+ const std::optional<NKikimrDataEvents::TMvccSnapshot>& mvccSnapshot,
+ const NKikimrDataEvents::ELockMode lockMode,
const IKqpTransactionManagerPtr& txManager,
const TActorId sessionActorId,
TIntrusivePtr<TKqpCounters> counters,
NWilson::TTraceId traceId)
: TypeEnv(typeEnv)
, Alloc(alloc)
+ , MvccSnapshot(mvccSnapshot)
+ , LockMode(lockMode)
, TableId(tableId)
, TablePath(tablePath)
, LockTxId(lockTxId)
@@ -850,6 +853,12 @@ public:
FillEvWritePrepare(evWrite.get(), shardId, *TxId, TxManager);
} else if (!InconsistentTx) {
evWrite->SetLockId(LockTxId, LockNodeId);
+ evWrite->Record.SetLockMode(LockMode);
+
+ if (LockMode == NKikimrDataEvents::OPTIMISTIC_SNAPSHOT_ISOLATION) {
+ YQL_ENSURE(MvccSnapshot);
+ *evWrite->Record.MutableMvccSnapshot() = *MvccSnapshot;
+ }
}
const auto serializationResult = ShardedWriteController->SerializeMessageToPayload(shardId, *evWrite);
@@ -1041,6 +1050,9 @@ public:
const NMiniKQL::TTypeEnvironment& TypeEnv;
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
+ const std::optional<NKikimrDataEvents::TMvccSnapshot> MvccSnapshot;
+ const NKikimrDataEvents::ELockMode LockMode;
+
const TTableId TableId;
const TString TablePath;
@@ -1133,6 +1145,8 @@ public:
std::move(keyColumnTypes),
TypeEnv,
Alloc,
+ Settings.GetMvccSnapshot(),
+ Settings.GetLockMode(),
nullptr,
TActorId{},
Counters,
@@ -1337,6 +1351,8 @@ struct TTransactionSettings {
ui64 LockTxId = 0;
ui64 LockNodeId = 0;
bool InconsistentTx = false;
+ std::optional<NKikimrDataEvents::TMvccSnapshot> MvccSnapshot;
+ NKikimrDataEvents::ELockMode LockMode;
};
struct TWriteSettings {
@@ -1470,6 +1486,8 @@ public:
std::move(keyColumnTypes),
TypeEnv,
Alloc,
+ settings.TransactionSettings.MvccSnapshot,
+ settings.TransactionSettings.LockMode,
TxManager,
SessionActorId,
Counters,
@@ -2475,6 +2493,8 @@ private:
.LockTxId = Settings.GetLockTxId(),
.LockNodeId = Settings.GetLockNodeId(),
.InconsistentTx = Settings.GetInconsistentTx(),
+ .MvccSnapshot = Settings.GetMvccSnapshot(),
+ .LockMode = Settings.GetLockMode(),
},
.Priority = Settings.GetPriority(),
.IsOlap = Settings.GetIsOlap(),
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h
index 467605df5c..8c0dfa76ea 100644
--- a/ydb/core/kqp/session_actor/kqp_query_state.h
+++ b/ydb/core/kqp/session_actor/kqp_query_state.h
@@ -354,6 +354,12 @@ public:
return false;
}
+ if (TxCtx->EffectiveIsolationLevel == NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW) {
+ // ReadWrite snapshot isolation transaction with can only use uncommitted data.
+ // WriteOnly snapshot isolation transaction is executed like serializable transaction.
+ return !TxCtx->HasTableRead;
+ }
+
if (TxCtx->NeedUncommittedChangesFlush || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
if (tx && tx->GetHasEffects()) {
YQL_ENSURE(tx->ResultsSize() == 0);
@@ -370,7 +376,8 @@ public:
bool ShouldAcquireLocks(const TKqpPhyTxHolder::TConstPtr& tx) {
Y_UNUSED(tx);
- if (*TxCtx->EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE) {
+ if (*TxCtx->EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE &&
+ *TxCtx->EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW) {
return false;
}
@@ -415,7 +422,7 @@ public:
auto tx = PreparedQuery->GetPhyTxOrEmpty(CurrentTx);
if (TxCtx->CanDeferEffects()) {
- // At current time sinks require separate tnx with commit.
+ // Olap sinks require separate tnx with commit.
while (tx && tx->GetHasEffects() && !TxCtx->HasOlapTable) {
QueryData->CreateKqpValueMap(tx);
bool success = TxCtx->AddDeferredEffect(tx, QueryData);
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 95c7cd0966..5a1b29a1ac 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -818,6 +818,12 @@ public:
QueryState->TxCtx->SetIsolationLevel(settings);
QueryState->TxCtx->OnBeginQuery();
+ if (QueryState->TxCtx->EffectiveIsolationLevel == NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW
+ && !Settings.TableService.GetEnableSnapshotIsolationRW()) {
+ ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST)
+ << "Writes aren't supported for Snapshot Isolation";
+ }
+
if (!Transactions.CreateNew(QueryState->TxId.GetValue(), QueryState->TxCtx)) {
std::vector<TIssue> issues{
YqlIssue({}, TIssuesIds::KIKIMR_TOO_MANY_TRANSACTIONS)};
@@ -893,9 +899,12 @@ public:
const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
const bool hasOlapWrite = ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery);
const bool hasOltpWrite = ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery);
- QueryState->TxCtx->HasOlapTable |= hasOlapWrite || ::NKikimr::NKqp::HasOlapTableReadInTx(phyQuery);
- QueryState->TxCtx->HasOltpTable |= hasOltpWrite || ::NKikimr::NKqp::HasOltpTableReadInTx(phyQuery);
+ const bool hasOlapRead = ::NKikimr::NKqp::HasOlapTableReadInTx(phyQuery);
+ const bool hasOltpRead = ::NKikimr::NKqp::HasOltpTableReadInTx(phyQuery);
+ QueryState->TxCtx->HasOlapTable |= hasOlapWrite || hasOlapRead;
+ QueryState->TxCtx->HasOltpTable |= hasOltpWrite || hasOltpRead;
QueryState->TxCtx->HasTableWrite |= hasOlapWrite || hasOltpWrite;
+ QueryState->TxCtx->HasTableRead |= hasOlapRead || hasOltpRead;
if (QueryState->TxCtx->HasOlapTable && QueryState->TxCtx->HasOltpTable && QueryState->TxCtx->HasTableWrite
&& !Settings.TableService.GetEnableHtapTx() && !QueryState->IsSplitted()) {
ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED,
@@ -1130,8 +1139,8 @@ public:
return;
}
- if (QueryState->TxCtx->ShouldExecuteDeferredEffects()) {
- ExecuteDeferredEffectsImmediately();
+ if (QueryState->TxCtx->ShouldExecuteDeferredEffects(tx)) {
+ ExecuteDeferredEffectsImmediately(tx);
} else if (auto commit = QueryState->ShouldCommitWithCurrentTx(tx); commit || tx) {
ExecutePhyTx(tx, commit);
} else {
@@ -1139,8 +1148,8 @@ public:
}
}
- void ExecuteDeferredEffectsImmediately() {
- YQL_ENSURE(QueryState->TxCtx->ShouldExecuteDeferredEffects());
+ void ExecuteDeferredEffectsImmediately(const TKqpPhyTxHolder::TConstPtr& tx) {
+ YQL_ENSURE(QueryState->TxCtx->ShouldExecuteDeferredEffects(tx));
auto& txCtx = *QueryState->TxCtx;
auto request = PrepareRequest(/* tx */ nullptr, /* literal */ false, QueryState.get());
@@ -1160,6 +1169,9 @@ public:
txCtx.ClearDeferredEffects();
SendToExecuter(QueryState->TxCtx.Get(), std::move(request));
+ if (!tx) {
+ ++QueryState->CurrentTx;
+ }
}
bool ExecutePhyTx(const TKqpPhyTxHolder::TConstPtr& tx, bool commit) {
diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
index bd30804327..38c49472ef 100644
--- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
@@ -4241,7 +4241,6 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
}
Y_UNIT_TEST_TWIN(TableSink_Oltp_Replace, UseSink) {
- //UseSink = true;
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(UseSink);
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
diff --git a/ydb/core/kqp/ut/tx/kqp_sink_common.h b/ydb/core/kqp/ut/tx/kqp_sink_common.h
index 80dae769b1..d7e9075a1e 100644
--- a/ydb/core/kqp/ut/tx/kqp_sink_common.h
+++ b/ydb/core/kqp/ut/tx/kqp_sink_common.h
@@ -25,6 +25,7 @@ public:
void Execute() {
AppConfig.MutableTableServiceConfig()->SetEnableOlapSink(!DisableSinks);
AppConfig.MutableTableServiceConfig()->SetEnableOltpSink(!DisableSinks);
+ AppConfig.MutableTableServiceConfig()->SetEnableSnapshotIsolationRW(true);
AppConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(true);
auto settings = TKikimrSettings().SetAppConfig(AppConfig).SetWithSampleTables(false);
if (FastSnapshotExpiration) {
diff --git a/ydb/core/kqp/ut/tx/kqp_snapshot_isolation_ut.cpp b/ydb/core/kqp/ut/tx/kqp_snapshot_isolation_ut.cpp
new file mode 100644
index 0000000000..47fc6216bb
--- /dev/null
+++ b/ydb/core/kqp/ut/tx/kqp_snapshot_isolation_ut.cpp
@@ -0,0 +1,232 @@
+#include "kqp_sink_common.h"
+
+#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
+#include <ydb/core/testlib/common_helper.h>
+#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
+#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
+
+namespace NKikimr {
+namespace NKqp {
+
+using namespace NYdb;
+using namespace NYdb::NQuery;
+
+Y_UNIT_TEST_SUITE(KqpSnapshotIsolation) {
+ class TSimple : public TTableDataModificationTester {
+ protected:
+ void DoExecute() override {
+ auto client = Kikimr->GetQueryClient();
+ auto session1 = client.GetSession().GetValueSync().GetSession();
+
+ {
+ auto result = session1.ExecuteQuery(Q_(R"(
+ SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name;
+ )"), TTxControl::BeginTx(TTxSettings::SnapshotRW()).CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([[[300u];["None"];1u;"Paul"]])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+
+ {
+ auto result = session1.ExecuteQuery(Q_(R"(
+ UPSERT INTO `/Root/Test` (Group, Name, Comment)
+ VALUES (1U, "Paul", "Changed");
+ )"), TTxControl::BeginTx(TTxSettings::SnapshotRW()).CommitTx(), TExecuteQuerySettings().ClientTimeout(TDuration::MilliSeconds(1000))).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ auto result = session1.ExecuteQuery(Q_(R"(
+ SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name;
+ )"), TTxControl::BeginTx(TTxSettings::SnapshotRW()).CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([[[300u];["Changed"];1u;"Paul"]])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ }
+ };
+
+ Y_UNIT_TEST(TSimpleOltp) {
+ TSimple tester;
+ tester.SetIsOlap(false);
+ tester.Execute();
+ }
+
+ Y_UNIT_TEST(TSimpleOltpNoSink) {
+ TSimple tester;
+ tester.SetIsOlap(false);
+ tester.SetDisableSinks(true);
+ tester.Execute();
+ }
+
+ Y_UNIT_TEST(TSimpleOlap) {
+ TSimple tester;
+ tester.SetIsOlap(true);
+ tester.Execute();
+ }
+
+ class TConflictWrite : public TTableDataModificationTester {
+ protected:
+ void DoExecute() override {
+ auto client = Kikimr->GetQueryClient();
+ auto session1 = client.GetSession().GetValueSync().GetSession();
+ auto session2 = client.GetSession().GetValueSync().GetSession();
+
+ auto result = session1.ExecuteQuery(Q_(R"(
+ SELECT * FROM `/Root/KV`;
+ )"), TTxControl::BeginTx(TTxSettings::SnapshotRW())).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ auto tx1 = result.GetTransaction();
+ UNIT_ASSERT(tx1);
+
+ result = session2.ExecuteQuery(Q_(R"(
+ UPSERT INTO `/Root/Test` (Group, Name, Comment)
+ VALUES (1U, "Paul", "Changed Other");
+ )"), TTxControl::BeginTx(TTxSettings::SnapshotRW()).CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ result = session1.ExecuteQuery(Q_(R"(
+ UPSERT INTO `/Root/Test` (Group, Name, Comment)
+ VALUES (1U, "Paul", "Changed");
+ )"), TTxControl::Tx(tx1->GetId()).CommitTx()).ExtractValueSync();
+ // Keys changed since taking snapshot.
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
+
+ result = session2.ExecuteQuery(Q_(R"(
+ SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name;
+ )"), TTxControl::BeginTx(TTxSettings::SnapshotRW()).CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([[[300u];["Changed Other"];1u;"Paul"]])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ };
+
+ Y_UNIT_TEST(TConflictWriteOltp) {
+ TConflictWrite tester;
+ tester.SetIsOlap(false);
+ tester.Execute();
+ }
+
+ Y_UNIT_TEST(TConflictWriteOltpNoSink) {
+ TConflictWrite tester;
+ tester.SetIsOlap(false);
+ tester.SetDisableSinks(true);
+ tester.Execute();
+ }
+
+ Y_UNIT_TEST(TConflictWriteOlap) {
+ TConflictWrite tester;
+ tester.SetIsOlap(true);
+ tester.Execute();
+ }
+
+ class TConflictReadWrite : public TTableDataModificationTester {
+ protected:
+ void DoExecute() override {
+ auto client = Kikimr->GetQueryClient();
+ auto session1 = client.GetSession().GetValueSync().GetSession();
+ auto session2 = client.GetSession().GetValueSync().GetSession();
+
+ auto result = session1.ExecuteQuery(Q_(R"(
+ SELECT * FROM `/Root/Test`;
+ )"), TTxControl::BeginTx(TTxSettings::SnapshotRW())).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ auto tx1 = result.GetTransaction();
+ UNIT_ASSERT(tx1);
+
+ result = session2.ExecuteQuery(Q_(R"(
+ UPSERT INTO `/Root/Test` (Group, Name, Comment)
+ VALUES (1U, "NOT Paul", "Changed Other");
+ )"), TTxControl::BeginTx(TTxSettings::SnapshotRW()).CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ result = session1.ExecuteQuery(Q_(R"(
+ UPSERT INTO `/Root/Test` (Group, Name, Comment)
+ VALUES (1U, "Paul", "Changed");
+ )"), TTxControl::Tx(tx1->GetId()).CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ result = session2.ExecuteQuery(Q_(R"(
+ SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name;
+ )"), TTxControl::BeginTx(TTxSettings::SnapshotRW()).CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([[[300u];["Changed"];1u;"Paul"]])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ };
+
+ Y_UNIT_TEST(TConflictReadWriteOltp) {
+ TConflictReadWrite tester;
+ tester.SetIsOlap(false);
+ tester.Execute();
+ }
+
+ Y_UNIT_TEST(TConflictReadWriteOltpNoSink) {
+ TConflictReadWrite tester;
+ tester.SetIsOlap(false);
+ tester.SetDisableSinks(true);
+ tester.Execute();
+ }
+
+ Y_UNIT_TEST(TConflictReadWriteOlap) {
+ TConflictReadWrite tester;
+ tester.SetIsOlap(true);
+ tester.Execute();
+ }
+
+ class TReadOnly : public TTableDataModificationTester {
+ protected:
+ void DoExecute() override {
+ auto client = Kikimr->GetQueryClient();
+ auto session1 = client.GetSession().GetValueSync().GetSession();
+ auto session2 = client.GetSession().GetValueSync().GetSession();
+
+ auto result = session1.ExecuteQuery(Q_(R"(
+ SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name;
+ )"), TTxControl::BeginTx(TTxSettings::SnapshotRW())).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([[[300u];["None"];1u;"Paul"]])", FormatResultSetYson(result.GetResultSet(0)));
+
+ auto tx1 = result.GetTransaction();
+ UNIT_ASSERT(tx1);
+
+ result = session2.ExecuteQuery(Q_(R"(
+ UPSERT INTO `/Root/Test` (Group, Name, Comment)
+ VALUES (1U, "Paul", "Changed Other");
+ )"), TTxControl::BeginTx(TTxSettings::SnapshotRW()).CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ result = session1.ExecuteQuery(Q_(R"(
+ SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name;
+ )"), TTxControl::Tx(tx1->GetId()).CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([[[300u];["None"];1u;"Paul"]])", FormatResultSetYson(result.GetResultSet(0)));
+
+ result = session1.ExecuteQuery(Q_(R"(
+ SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name;
+ )"), TTxControl::BeginTx(TTxSettings::SnapshotRW()).CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([[[300u];["Changed Other"];1u;"Paul"]])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ };
+
+ Y_UNIT_TEST(TReadOnlyOltp) {
+ TReadOnly tester;
+ tester.SetIsOlap(false);
+ tester.Execute();
+ }
+
+ Y_UNIT_TEST(TReadOnlyOltpNoSink) {
+ TReadOnly tester;
+ tester.SetIsOlap(false);
+ tester.SetDisableSinks(true);
+ tester.Execute();
+ }
+
+ Y_UNIT_TEST(TReadOnlyOlap) {
+ TReadOnly tester;
+ tester.SetIsOlap(true);
+ tester.Execute();
+ }
+}
+
+} // namespace NKqp
+} // namespace NKikimr
diff --git a/ydb/core/kqp/ut/tx/ya.make b/ydb/core/kqp/ut/tx/ya.make
index c1495dd999..0a6133025b 100644
--- a/ydb/core/kqp/ut/tx/ya.make
+++ b/ydb/core/kqp/ut/tx/ya.make
@@ -17,6 +17,7 @@ SRCS(
kqp_sink_locks_ut.cpp
kqp_sink_mvcc_ut.cpp
kqp_sink_tx_ut.cpp
+ kqp_snapshot_isolation_ut.cpp
kqp_tx_ut.cpp
)
diff --git a/ydb/core/protos/data_events.proto b/ydb/core/protos/data_events.proto
index dacd854792..e856858f49 100644
--- a/ydb/core/protos/data_events.proto
+++ b/ydb/core/protos/data_events.proto
@@ -66,6 +66,11 @@ message TMvccSnapshot {
optional bool RepeatableRead = 12 [default = true];
}
+enum ELockMode {
+ OPTIMISTIC = 0;
+ OPTIMISTIC_SNAPSHOT_ISOLATION = 1;
+}
+
enum EDataFormat {
FORMAT_UNSPECIFIED = 0;
FORMAT_CELLVEC = 1;
@@ -115,6 +120,8 @@ message TEvWrite {
// This mostly affects the minimum MVCC version of the resulting write
optional TMvccSnapshot MvccSnapshot = 8;
optional uint32 GranuleShardingVersionId = 9;
+
+ optional ELockMode LockMode = 10;
}
message TEvWriteResult {
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index 10d35daaf6..301f7c5f9c 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -64,6 +64,7 @@ enum EIsolationLevel {
ISOLATION_LEVEL_READ_UNCOMMITTED = 3;
ISOLATION_LEVEL_READ_STALE = 4;
ISOLATION_LEVEL_SNAPSHOT_RO = 5;
+ ISOLATION_LEVEL_SNAPSHOT_RW = 6;
};
message TTopicOperationsRequest {
@@ -562,6 +563,8 @@ message TEvStartKqpTasksRequest {
optional double PoolMaxCpuShare = 12;
optional double QueryCpuShare = 16;
optional double ResourceWeight = 18;
+
+ optional NKikimrDataEvents.ELockMode LockMode = 19;
}
message TEvStartKqpTasksResponse {
@@ -735,6 +738,8 @@ message TKqpTableSinkSettings {
optional int64 Priority = 11;
optional bool IsOlap = 12;
repeated uint32 WriteIndexes = 13;
+ optional NKikimrDataEvents.TMvccSnapshot MvccSnapshot = 14;
+ optional NKikimrDataEvents.ELockMode LockMode = 15;
}
message TKqpStreamLookupSettings {
@@ -751,6 +756,7 @@ message TKqpStreamLookupSettings {
optional bool KeepRowsOrder = 11;
optional bool AllowNullKeys = 12;
optional uint32 AllowNullKeysPrefixSize = 13;
+ optional NKikimrDataEvents.ELockMode LockMode = 14;
}
message TKqpSequencerSettings {
diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto
index e8412b4eae..a7eb638068 100644
--- a/ydb/core/protos/table_service_config.proto
+++ b/ydb/core/protos/table_service_config.proto
@@ -356,4 +356,6 @@ message TTableServiceConfig {
// This value used for `arrow::Array`s built inside TDqOutputHashPartitionConsumerBlock -
// if the underlying array buffer is filled less than this value, then the buffer's capacity gets shrunk to actual size.
// Otherwise, we potentially don't track the real buffer capacity and it may lead to OOM situations inside DqOutputChannel's.
+
+ optional bool EnableSnapshotIsolationRW = 76 [default = false];
};
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index 6019cc753c..62a76826fb 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -261,6 +261,8 @@ message TKqpReadRangesSourceSettings {
optional bool AllowInconsistentReads = 18 [default = false];
repeated TKqpTransaction.TColumnMeta DuplicateCheckColumns = 19;
+
+ optional NKikimrDataEvents.ELockMode LockMode = 20;
}
message TKqpTaskInfo {
@@ -298,6 +300,7 @@ message TDataTransaction {
// Datashard will subscribe to lock status when node id is non-zero
optional uint32 LockNodeId = 17;
+ optional NKikimrDataEvents.ELockMode LockMode = 18;
}
message TCreateVolatileSnapshot {
@@ -1702,6 +1705,7 @@ message TEvKqpScan {
optional uint32 LockNodeId = 25;
optional string CSScanPolicy = 26;
optional NKikimrKqp.TEvKqpScanCursor ScanCursor = 27;
+ optional NKikimrDataEvents.ELockMode LockMode = 28;
}
message TEvCompactTable {
@@ -1887,6 +1891,8 @@ message TEvRead {
optional bytes Program = 902;
optional NKikimrSchemeOp.EOlapProgramType ProgramType = 903;
+
+ optional NKikimrDataEvents.ELockMode LockMode = 904;
}
message TReadContinuationToken {
diff --git a/ydb/public/api/protos/ydb_query.proto b/ydb/public/api/protos/ydb_query.proto
index b1fabe26d5..252f3dc43b 100644
--- a/ydb/public/api/protos/ydb_query.proto
+++ b/ydb/public/api/protos/ydb_query.proto
@@ -62,6 +62,8 @@ message StaleModeSettings {
message SnapshotModeSettings {
}
+message SnapshotRWModeSettings {
+}
message TransactionSettings {
oneof tx_mode {
@@ -69,6 +71,7 @@ message TransactionSettings {
OnlineModeSettings online_read_only = 2;
StaleModeSettings stale_read_only = 3;
SnapshotModeSettings snapshot_read_only = 4;
+ SnapshotRWModeSettings snapshot_read_write = 5;
}
}
diff --git a/ydb/public/api/protos/ydb_table.proto b/ydb/public/api/protos/ydb_table.proto
index dbfeafcac6..f21a3b2749 100644
--- a/ydb/public/api/protos/ydb_table.proto
+++ b/ydb/public/api/protos/ydb_table.proto
@@ -879,12 +879,16 @@ message StaleModeSettings {
message SnapshotModeSettings {
}
+message SnapshotRWModeSettings {
+}
+
message TransactionSettings {
oneof tx_mode {
SerializableModeSettings serializable_read_write = 1;
OnlineModeSettings online_read_only = 2;
StaleModeSettings stale_read_only = 3;
SnapshotModeSettings snapshot_read_only = 4;
+ SnapshotRWModeSettings snapshot_read_write = 5;
}
}
diff --git a/ydb/public/sdk/cpp/client/ydb_query/client.cpp b/ydb/public/sdk/cpp/client/ydb_query/client.cpp
index 331f17ec42..2a5f47a696 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/client.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_query/client.cpp
@@ -48,6 +48,9 @@ static void SetTxSettings(const TTxSettings& txSettings, Ydb::Query::Transaction
case TTxSettings::TS_SNAPSHOT_RO:
proto->mutable_snapshot_read_only();
break;
+ case TTxSettings::TS_SNAPSHOT_RW:
+ proto->mutable_snapshot_read_write();
+ break;
default:
throw TContractViolation("Unexpected transaction mode.");
}
diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp b/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp
index 93b91c5ac2..897a7ee8a3 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp
@@ -32,6 +32,9 @@ static void SetTxSettings(const TTxSettings& txSettings, Ydb::Query::Transaction
case TTxSettings::TS_SNAPSHOT_RO:
proto->mutable_snapshot_read_only();
break;
+ case TTxSettings::TS_SNAPSHOT_RW:
+ proto->mutable_snapshot_read_write();
+ break;
default:
throw TContractViolation("Unexpected transaction mode.");
}
diff --git a/ydb/public/sdk/cpp/client/ydb_query/tx.h b/ydb/public/sdk/cpp/client/ydb_query/tx.h
index 749ddd00db..d6e291f316 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/tx.h
+++ b/ydb/public/sdk/cpp/client/ydb_query/tx.h
@@ -34,6 +34,10 @@ struct TTxSettings {
return TTxSettings(TS_SNAPSHOT_RO);
}
+ static TTxSettings SnapshotRW() {
+ return TTxSettings(TS_SNAPSHOT_RW);
+ }
+
void Out(IOutputStream& out) const {
switch (Mode_) {
case TS_SERIALIZABLE_RW:
@@ -48,6 +52,9 @@ struct TTxSettings {
case TS_SNAPSHOT_RO:
out << "SnapshotRO";
break;
+ case TS_SNAPSHOT_RW:
+ out << "SnapshotRW";
+ break;
default:
out << "Unknown";
break;
@@ -58,7 +65,8 @@ struct TTxSettings {
TS_SERIALIZABLE_RW,
TS_ONLINE_RO,
TS_STALE_RO,
- TS_SNAPSHOT_RO
+ TS_SNAPSHOT_RO,
+ TS_SNAPSHOT_RW,
};
FLUENT_SETTING(TTxOnlineSettings, OnlineSettings);
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp
index 38a8481866..3e6dfa8d1f 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp
@@ -1134,6 +1134,9 @@ void TTableClient::TImpl::SetTxSettings(const TTxSettings& txSettings, Ydb::Tabl
case TTxSettings::TS_SNAPSHOT_RO:
proto->mutable_snapshot_read_only();
break;
+ case TTxSettings::TS_SNAPSHOT_RW:
+ proto->mutable_snapshot_read_write();
+ break;
default:
throw TContractViolation("Unexpected transaction mode.");
}
diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.h b/ydb/public/sdk/cpp/client/ydb_table/table.h
index 5c2ce1b28e..a6d2f68dca 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/table.h
+++ b/ydb/public/sdk/cpp/client/ydb_table/table.h
@@ -1317,6 +1317,10 @@ public:
return TTxSettings(TS_SNAPSHOT_RO);
}
+ static TTxSettings SnapshotRW() {
+ return TTxSettings(TS_SNAPSHOT_RW);
+ }
+
void Out(IOutputStream& out) const {
switch (Mode_) {
case TS_SERIALIZABLE_RW:
@@ -1331,6 +1335,9 @@ public:
case TS_SNAPSHOT_RO:
out << "SnapshotRO";
break;
+ case TS_SNAPSHOT_RW:
+ out << "SnapshotRW";
+ break;
default:
out << "Unknown";
break;
@@ -1342,7 +1349,8 @@ private:
TS_SERIALIZABLE_RW,
TS_ONLINE_RO,
TS_STALE_RO,
- TS_SNAPSHOT_RO
+ TS_SNAPSHOT_RO,
+ TS_SNAPSHOT_RW,
};
FLUENT_SETTING(TTxOnlineSettings, OnlineSettings);