summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikita Vasilev <[email protected]>2025-06-24 13:10:10 +0300
committerGitHub <[email protected]>2025-06-24 13:10:10 +0300
commit3a99069cf3656d577efeeeb64a831926bf670080 (patch)
tree52743cff9694e01d6ac5c12e4bc999df1ce423f3
parent93107dd77879cf9d0237916f57b89e7dd0a2fdcc (diff)
Fix sink flags (#19985)
-rw-r--r--ydb/core/kqp/common/kqp_tx.h4
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp1
-rw-r--r--ydb/core/kqp/query_compiler/kqp_query_compiler.cpp4
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp63
-rw-r--r--ydb/core/protos/kqp_physical.proto4
-rw-r--r--ydb/services/persqueue_v1/ut/topic_service_ut.cpp6
6 files changed, 69 insertions, 13 deletions
diff --git a/ydb/core/kqp/common/kqp_tx.h b/ydb/core/kqp/common/kqp_tx.h
index 77bdb4345f8..2e9bf7c54bb 100644
--- a/ydb/core/kqp/common/kqp_tx.h
+++ b/ydb/core/kqp/common/kqp_tx.h
@@ -360,6 +360,10 @@ public:
bool HasTableWrite = false;
bool HasTableRead = false;
+ std::optional<bool> EnableOltpSink;
+ std::optional<bool> EnableOlapSink;
+ std::optional<bool> EnableHtapTx;
+
bool NeedUncommittedChangesFlush = false;
THashSet<NKikimr::TTableId> ModifiedTablesSinceLastFlush;
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 45c3489d43e..9800dce71de 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -115,7 +115,6 @@ public:
{
Target = creator;
- YQL_ENSURE(!TxManager || executerConfig.TableServiceConfig.GetEnableOltpSink());
YQL_ENSURE(Request.IsolationLevel != NKikimrKqp::ISOLATION_LEVEL_UNDEFINED);
if (Request.AcquireLocksTxId || Request.LocksOp == ELocksOp::Commit || Request.LocksOp == ELocksOp::Rollback) {
diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
index d7d1464ed9c..ccdf306fcb7 100644
--- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
@@ -610,6 +610,10 @@ public:
YQL_ENSURE(querySettings.Type);
queryProto.SetType(GetPhyQueryType(*querySettings.Type));
+ queryProto.SetEnableOltpSink(Config->EnableOltpSink);
+ queryProto.SetEnableOlapSink(Config->EnableOlapSink);
+ queryProto.SetEnableHtapTx(Config->EnableHtapTx);
+
for (const auto& queryBlock : dataQueryBlocks) {
auto queryBlockSettings = TKiDataQueryBlockSettings::Parse(queryBlock);
if (queryBlockSettings.HasUncommittedChangesRead) {
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 0f58b4e2f60..4fd45dfcff3 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -928,6 +928,55 @@ public:
}
const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
+
+ auto checkSchemeTx = [&]() {
+ for (const auto &tx : phyQuery.GetTransactions()) {
+ if (tx.GetType() != NKqpProto::TKqpPhyTx::TYPE_SCHEME) {
+ return false;
+ }
+ }
+ return true;
+ };
+
+ if (phyQuery.HasEnableOltpSink()) {
+ if (!QueryState->TxCtx->EnableOltpSink) {
+ QueryState->TxCtx->EnableOltpSink = phyQuery.GetEnableOltpSink();
+ }
+ if (QueryState->TxCtx->EnableOltpSink != phyQuery.GetEnableOltpSink()) {
+ ReplyQueryError(Ydb::StatusIds::ABORTED,
+ "Transaction execution settings have been changed (EnableOltpSink).");
+ return false;
+ }
+ } else {
+ AFL_ENSURE(checkSchemeTx());
+ }
+
+ if (phyQuery.HasEnableOlapSink()) {
+ if (!QueryState->TxCtx->EnableOlapSink) {
+ QueryState->TxCtx->EnableOlapSink = phyQuery.GetEnableOlapSink();
+ }
+ if (QueryState->TxCtx->EnableOlapSink != phyQuery.GetEnableOlapSink()) {
+ ReplyQueryError(Ydb::StatusIds::ABORTED,
+ "Transaction execution settings have been changed (EnableOlapSink).");
+ return false;
+ }
+ } else {
+ AFL_ENSURE(checkSchemeTx());
+ }
+
+ if (phyQuery.HasEnableHtapTx()) {
+ if (!QueryState->TxCtx->EnableHtapTx) {
+ QueryState->TxCtx->EnableHtapTx = phyQuery.GetEnableHtapTx();
+ }
+ if (QueryState->TxCtx->EnableHtapTx != phyQuery.GetEnableHtapTx()) {
+ ReplyQueryError(Ydb::StatusIds::ABORTED,
+ "Transaction execution settings have been changed (EnableHtapTx).");
+ return false;
+ }
+ } else {
+ AFL_ENSURE(checkSchemeTx());
+ }
+
const bool hasOlapWrite = ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery);
const bool hasOltpWrite = ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery);
const bool hasOlapRead = ::NKikimr::NKqp::HasOlapTableReadInTx(phyQuery);
@@ -937,7 +986,7 @@ public:
QueryState->TxCtx->HasTableWrite |= hasOlapWrite || hasOltpWrite;
QueryState->TxCtx->HasTableRead |= hasOlapRead || hasOltpRead;
if (QueryState->TxCtx->HasOlapTable && QueryState->TxCtx->HasOltpTable && QueryState->TxCtx->HasTableWrite
- && !Settings.TableService.GetEnableHtapTx() && !QueryState->IsSplitted()) {
+ && !QueryState->TxCtx->EnableHtapTx.value_or(false) && !QueryState->IsSplitted()) {
ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED,
"Write transactions between column and row tables are disabled at current time.");
return false;
@@ -1178,7 +1227,7 @@ public:
return;
}
- if (Settings.TableService.GetEnableOltpSink() && isBatchQuery) {
+ if (QueryState->TxCtx->EnableOltpSink.value_or(false) && isBatchQuery) {
if (!Settings.TableService.GetEnableBatchUpdates()) {
ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED,
"BATCH operations are disabled by EnableBatchUpdates flag.");
@@ -1353,7 +1402,7 @@ public:
request.PerShardKeysSizeLimitBytes = Config->_CommitPerShardKeysSizeLimitBytes.Get().GetRef();
}
- if (Settings.TableService.GetEnableOltpSink()) {
+ if (txCtx.EnableOltpSink.value_or(false)) {
if (txCtx.TxHasEffects() || hasLocks || txCtx.TopicOperations.HasOperations()) {
request.AcquireLocksTxId = txCtx.Locks.GetLockTxId();
}
@@ -1390,7 +1439,7 @@ public:
}
}
request.TopicOperations = std::move(txCtx.TopicOperations);
- } else if (QueryState->ShouldAcquireLocks(tx) && (!txCtx.HasOlapTable || Settings.TableService.GetEnableOlapSink())) {
+ } else if (QueryState->ShouldAcquireLocks(tx) && (!txCtx.HasOlapTable || txCtx.EnableOlapSink.value_or(false))) {
request.AcquireLocksTxId = txCtx.Locks.GetLockTxId();
if (!txCtx.CanDeferEffects()) {
@@ -1453,12 +1502,12 @@ public:
request.ResourceManager_ = ResourceManager_;
LOG_D("Sending to Executer TraceId: " << request.TraceId.GetTraceId() << " " << request.TraceId.GetSpanIdSize());
- if (Settings.TableService.GetEnableOltpSink() && !txCtx->TxManager) {
+ if (txCtx->EnableOltpSink.value_or(false) && !txCtx->TxManager) {
txCtx->TxManager = CreateKqpTransactionManager();
txCtx->TxManager->SetAllowVolatile(AppData()->FeatureFlags.GetEnableDataShardVolatileTransactions());
}
- if (Settings.TableService.GetEnableOltpSink()
+ if (txCtx->EnableOltpSink.value_or(false)
&& !txCtx->BufferActorId
&& (txCtx->HasTableWrite || request.TopicOperations.GetSize() != 0)) {
txCtx->TxManager->SetTopicOperations(std::move(request.TopicOperations));
@@ -1493,7 +1542,7 @@ public:
};
auto* actor = CreateKqpBufferWriterActor(std::move(settings));
txCtx->BufferActorId = RegisterWithSameMailbox(actor);
- } else if (Settings.TableService.GetEnableOltpSink() && txCtx->BufferActorId) {
+ } else if (txCtx->EnableOltpSink.value_or(false) && txCtx->BufferActorId) {
txCtx->TxManager->SetTopicOperations(std::move(request.TopicOperations));
txCtx->TxManager->AddTopicsToShards();
}
diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto
index 29c7165c379..49d225fe66e 100644
--- a/ydb/core/protos/kqp_physical.proto
+++ b/ydb/core/protos/kqp_physical.proto
@@ -582,4 +582,8 @@ message TKqpPhyQuery {
string QueryDiagnostics = 10;
repeated TKqpTableInfo ViewInfos = 11;
+
+ optional bool EnableOltpSink = 12;
+ optional bool EnableOlapSink = 13;
+ optional bool EnableHtapTx = 14;
}
diff --git a/ydb/services/persqueue_v1/ut/topic_service_ut.cpp b/ydb/services/persqueue_v1/ut/topic_service_ut.cpp
index 2ce697fe169..7a5e518e715 100644
--- a/ydb/services/persqueue_v1/ut/topic_service_ut.cpp
+++ b/ydb/services/persqueue_v1/ut/topic_service_ut.cpp
@@ -412,11 +412,7 @@ Y_UNIT_TEST_F(MultiplePartitionsAndNoGapsInTheOffsets, TUpdateOffsetsInTransacti
auto result = tx->Commit().ExtractValueSync();
Cerr << ">>> CommitTx >>>" << Endl;
UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
- if (server->ServerSettings.AppConfig->GetTableServiceConfig().GetEnableOltpSink()) {
- UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::BAD_REQUEST);
- } else {
- UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::ABORTED);
- }
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::ABORTED);
}
}