diff options
| author | Nikita Vasilev <[email protected]> | 2025-06-24 13:10:10 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2025-06-24 13:10:10 +0300 |
| commit | 3a99069cf3656d577efeeeb64a831926bf670080 (patch) | |
| tree | 52743cff9694e01d6ac5c12e4bc999df1ce423f3 | |
| parent | 93107dd77879cf9d0237916f57b89e7dd0a2fdcc (diff) | |
Fix sink flags (#19985)
| -rw-r--r-- | ydb/core/kqp/common/kqp_tx.h | 4 | ||||
| -rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 1 | ||||
| -rw-r--r-- | ydb/core/kqp/query_compiler/kqp_query_compiler.cpp | 4 | ||||
| -rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 63 | ||||
| -rw-r--r-- | ydb/core/protos/kqp_physical.proto | 4 | ||||
| -rw-r--r-- | ydb/services/persqueue_v1/ut/topic_service_ut.cpp | 6 |
6 files changed, 69 insertions, 13 deletions
diff --git a/ydb/core/kqp/common/kqp_tx.h b/ydb/core/kqp/common/kqp_tx.h index 77bdb4345f8..2e9bf7c54bb 100644 --- a/ydb/core/kqp/common/kqp_tx.h +++ b/ydb/core/kqp/common/kqp_tx.h @@ -360,6 +360,10 @@ public: bool HasTableWrite = false; bool HasTableRead = false; + std::optional<bool> EnableOltpSink; + std::optional<bool> EnableOlapSink; + std::optional<bool> EnableHtapTx; + bool NeedUncommittedChangesFlush = false; THashSet<NKikimr::TTableId> ModifiedTablesSinceLastFlush; diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 45c3489d43e..9800dce71de 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -115,7 +115,6 @@ public: { Target = creator; - YQL_ENSURE(!TxManager || executerConfig.TableServiceConfig.GetEnableOltpSink()); YQL_ENSURE(Request.IsolationLevel != NKikimrKqp::ISOLATION_LEVEL_UNDEFINED); if (Request.AcquireLocksTxId || Request.LocksOp == ELocksOp::Commit || Request.LocksOp == ELocksOp::Rollback) { diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index d7d1464ed9c..ccdf306fcb7 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -610,6 +610,10 @@ public: YQL_ENSURE(querySettings.Type); queryProto.SetType(GetPhyQueryType(*querySettings.Type)); + queryProto.SetEnableOltpSink(Config->EnableOltpSink); + queryProto.SetEnableOlapSink(Config->EnableOlapSink); + queryProto.SetEnableHtapTx(Config->EnableHtapTx); + for (const auto& queryBlock : dataQueryBlocks) { auto queryBlockSettings = TKiDataQueryBlockSettings::Parse(queryBlock); if (queryBlockSettings.HasUncommittedChangesRead) { diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 0f58b4e2f60..4fd45dfcff3 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -928,6 +928,55 @@ public: } const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery(); + + auto checkSchemeTx = [&]() { + for (const auto &tx : phyQuery.GetTransactions()) { + if (tx.GetType() != NKqpProto::TKqpPhyTx::TYPE_SCHEME) { + return false; + } + } + return true; + }; + + if (phyQuery.HasEnableOltpSink()) { + if (!QueryState->TxCtx->EnableOltpSink) { + QueryState->TxCtx->EnableOltpSink = phyQuery.GetEnableOltpSink(); + } + if (QueryState->TxCtx->EnableOltpSink != phyQuery.GetEnableOltpSink()) { + ReplyQueryError(Ydb::StatusIds::ABORTED, + "Transaction execution settings have been changed (EnableOltpSink)."); + return false; + } + } else { + AFL_ENSURE(checkSchemeTx()); + } + + if (phyQuery.HasEnableOlapSink()) { + if (!QueryState->TxCtx->EnableOlapSink) { + QueryState->TxCtx->EnableOlapSink = phyQuery.GetEnableOlapSink(); + } + if (QueryState->TxCtx->EnableOlapSink != phyQuery.GetEnableOlapSink()) { + ReplyQueryError(Ydb::StatusIds::ABORTED, + "Transaction execution settings have been changed (EnableOlapSink)."); + return false; + } + } else { + AFL_ENSURE(checkSchemeTx()); + } + + if (phyQuery.HasEnableHtapTx()) { + if (!QueryState->TxCtx->EnableHtapTx) { + QueryState->TxCtx->EnableHtapTx = phyQuery.GetEnableHtapTx(); + } + if (QueryState->TxCtx->EnableHtapTx != phyQuery.GetEnableHtapTx()) { + ReplyQueryError(Ydb::StatusIds::ABORTED, + "Transaction execution settings have been changed (EnableHtapTx)."); + return false; + } + } else { + AFL_ENSURE(checkSchemeTx()); + } + const bool hasOlapWrite = ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery); const bool hasOltpWrite = ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery); const bool hasOlapRead = ::NKikimr::NKqp::HasOlapTableReadInTx(phyQuery); @@ -937,7 +986,7 @@ public: QueryState->TxCtx->HasTableWrite |= hasOlapWrite || hasOltpWrite; QueryState->TxCtx->HasTableRead |= hasOlapRead || hasOltpRead; if (QueryState->TxCtx->HasOlapTable && QueryState->TxCtx->HasOltpTable && QueryState->TxCtx->HasTableWrite - && !Settings.TableService.GetEnableHtapTx() && !QueryState->IsSplitted()) { + && !QueryState->TxCtx->EnableHtapTx.value_or(false) && !QueryState->IsSplitted()) { ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED, "Write transactions between column and row tables are disabled at current time."); return false; @@ -1178,7 +1227,7 @@ public: return; } - if (Settings.TableService.GetEnableOltpSink() && isBatchQuery) { + if (QueryState->TxCtx->EnableOltpSink.value_or(false) && isBatchQuery) { if (!Settings.TableService.GetEnableBatchUpdates()) { ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED, "BATCH operations are disabled by EnableBatchUpdates flag."); @@ -1353,7 +1402,7 @@ public: request.PerShardKeysSizeLimitBytes = Config->_CommitPerShardKeysSizeLimitBytes.Get().GetRef(); } - if (Settings.TableService.GetEnableOltpSink()) { + if (txCtx.EnableOltpSink.value_or(false)) { if (txCtx.TxHasEffects() || hasLocks || txCtx.TopicOperations.HasOperations()) { request.AcquireLocksTxId = txCtx.Locks.GetLockTxId(); } @@ -1390,7 +1439,7 @@ public: } } request.TopicOperations = std::move(txCtx.TopicOperations); - } else if (QueryState->ShouldAcquireLocks(tx) && (!txCtx.HasOlapTable || Settings.TableService.GetEnableOlapSink())) { + } else if (QueryState->ShouldAcquireLocks(tx) && (!txCtx.HasOlapTable || txCtx.EnableOlapSink.value_or(false))) { request.AcquireLocksTxId = txCtx.Locks.GetLockTxId(); if (!txCtx.CanDeferEffects()) { @@ -1453,12 +1502,12 @@ public: request.ResourceManager_ = ResourceManager_; LOG_D("Sending to Executer TraceId: " << request.TraceId.GetTraceId() << " " << request.TraceId.GetSpanIdSize()); - if (Settings.TableService.GetEnableOltpSink() && !txCtx->TxManager) { + if (txCtx->EnableOltpSink.value_or(false) && !txCtx->TxManager) { txCtx->TxManager = CreateKqpTransactionManager(); txCtx->TxManager->SetAllowVolatile(AppData()->FeatureFlags.GetEnableDataShardVolatileTransactions()); } - if (Settings.TableService.GetEnableOltpSink() + if (txCtx->EnableOltpSink.value_or(false) && !txCtx->BufferActorId && (txCtx->HasTableWrite || request.TopicOperations.GetSize() != 0)) { txCtx->TxManager->SetTopicOperations(std::move(request.TopicOperations)); @@ -1493,7 +1542,7 @@ public: }; auto* actor = CreateKqpBufferWriterActor(std::move(settings)); txCtx->BufferActorId = RegisterWithSameMailbox(actor); - } else if (Settings.TableService.GetEnableOltpSink() && txCtx->BufferActorId) { + } else if (txCtx->EnableOltpSink.value_or(false) && txCtx->BufferActorId) { txCtx->TxManager->SetTopicOperations(std::move(request.TopicOperations)); txCtx->TxManager->AddTopicsToShards(); } diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index 29c7165c379..49d225fe66e 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -582,4 +582,8 @@ message TKqpPhyQuery { string QueryDiagnostics = 10; repeated TKqpTableInfo ViewInfos = 11; + + optional bool EnableOltpSink = 12; + optional bool EnableOlapSink = 13; + optional bool EnableHtapTx = 14; } diff --git a/ydb/services/persqueue_v1/ut/topic_service_ut.cpp b/ydb/services/persqueue_v1/ut/topic_service_ut.cpp index 2ce697fe169..7a5e518e715 100644 --- a/ydb/services/persqueue_v1/ut/topic_service_ut.cpp +++ b/ydb/services/persqueue_v1/ut/topic_service_ut.cpp @@ -412,11 +412,7 @@ Y_UNIT_TEST_F(MultiplePartitionsAndNoGapsInTheOffsets, TUpdateOffsetsInTransacti auto result = tx->Commit().ExtractValueSync(); Cerr << ">>> CommitTx >>>" << Endl; UNIT_ASSERT_EQUAL(result.IsTransportError(), false); - if (server->ServerSettings.AppConfig->GetTableServiceConfig().GetEnableOltpSink()) { - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::BAD_REQUEST); - } else { - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::ABORTED); - } + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::ABORTED); } } |
