diff options
author | abcdef <akotov@ydb.tech> | 2022-09-19 16:27:08 +0300 |
---|---|---|
committer | abcdef <akotov@ydb.tech> | 2022-09-19 16:27:08 +0300 |
commit | b5a861d056d17322efb73efb580e0d32238e452c (patch) | |
tree | c9aea63a248b60876f0eb27471ebdd1ae7f0aab6 | |
parent | 7495c426ef6955671d7ec18ac944a69ab180dab2 (diff) | |
download | ydb-b5a861d056d17322efb73efb580e0d32238e452c.tar.gz |
-rw-r--r-- | ydb/core/kqp/common/kqp_gateway.h | 4 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp_topic.cpp | 312 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp_topic.h | 99 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp_transform.h | 6 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_data_executer.cpp | 174 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_executer_impl.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_session_actor.cpp | 85 | ||||
-rw-r--r-- | ydb/core/persqueue/events/global.h | 13 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 35 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.h | 2 | ||||
-rw-r--r-- | ydb/core/protos/kqp.proto | 7 | ||||
-rw-r--r-- | ydb/core/protos/pqconfig.proto | 53 | ||||
-rw-r--r-- | ydb/public/api/protos/ydb_topic.proto | 2 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.cpp | 10 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/ut/topic_service_ut.cpp | 108 |
15 files changed, 786 insertions, 125 deletions
diff --git a/ydb/core/kqp/common/kqp_gateway.h b/ydb/core/kqp/common/kqp_gateway.h index f7d18b682dd..196f6098281 100644 --- a/ydb/core/kqp/common/kqp_gateway.h +++ b/ydb/core/kqp/common/kqp_gateway.h @@ -12,6 +12,8 @@ #include <library/cpp/actors/core/actorid.h> #include <library/cpp/lwtrace/shuttle.h> +#include <ydb/core/kqp/common/kqp_topic.h> + namespace NKikimr { namespace NKqp { @@ -130,6 +132,8 @@ public: NLWTrace::TOrbit Orbit; NWilson::TTraceId TraceId; + + NTopic::TTopicOperations TopicOperations; }; struct TExecPhysicalResult : public TGenericResult { diff --git a/ydb/core/kqp/common/kqp_topic.cpp b/ydb/core/kqp/common/kqp_topic.cpp index 7aaaca9e75f..e1fc6b54877 100644 --- a/ydb/core/kqp/common/kqp_topic.cpp +++ b/ydb/core/kqp/common/kqp_topic.cpp @@ -6,78 +6,237 @@ namespace NKikimr::NKqp::NTopic { // -// TPartition +// TConsumerOperations // -void TPartition::AddRange(const Ydb::Topic::OffsetsRange &range) +bool TConsumerOperations::IsValid() const { - AddRangeImpl(range.start(), range.end()); + return Offsets_.GetNumIntervals() == 1; } -void TPartition::SetTabletId(ui64 value) +std::pair<ui64, ui64> TConsumerOperations::GetRange() const { - TabletId_ = value; + Y_VERIFY(IsValid()); + + return {Offsets_.Min(), Offsets_.Max()}; } -void TPartition::Merge(const TPartition& rhs) +void TConsumerOperations::AddOperation(const TString& consumer, const Ydb::Topic::OffsetsRange& range) { + Y_VERIFY(Consumer_.Empty() || Consumer_ == consumer); + + AddOperationImpl(consumer, range.start(), range.end()); +} + +void TConsumerOperations::Merge(const TConsumerOperations& rhs) +{ + Y_VERIFY(rhs.Consumer_.Defined()); + Y_VERIFY(Consumer_.Empty() || Consumer_ == rhs.Consumer_); + for (auto& range : rhs.Offsets_) { - AddRangeImpl(range.first, range.second); + AddOperationImpl(*rhs.Consumer_, range.first, range.second); } } -void TPartition::AddRangeImpl(ui64 begin, ui64 end) +void TConsumerOperations::AddOperationImpl(const TString& consumer, + ui64 begin, ui64 end) { if (Offsets_.Intersects(begin, end)) { ythrow TOffsetsRangeIntersectExpection() << "offset ranges intersect"; } + if (Consumer_.Empty()) { + Consumer_ = consumer; + } + Offsets_.InsertInterval(begin, end); } // -// TTopic +// TTopicPartitionOperations // -TPartition& TTopic::AddPartition(ui32 id) +bool TTopicPartitionOperations::IsValid() const { - return Partitions_[id]; + return std::all_of(Operations_.begin(), Operations_.end(), + [](auto& x) { return x.second.IsValid(); }); } -TPartition* TTopic::GetPartition(ui32 id) +void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partition, + const TString& consumer, + const Ydb::Topic::OffsetsRange& range) { - auto p = Partitions_.find(id); - if (p == Partitions_.end()) { - return nullptr; + Y_VERIFY(Topic_.Empty() || Topic_ == topic); + Y_VERIFY(Partition_.Empty() || Partition_ == partition); + + if (Topic_.Empty()) { + Topic_ = topic; + Partition_ = partition; + } + + Operations_[consumer].AddOperation(consumer, range); +} + +void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partition) +{ + Y_VERIFY(Topic_.Empty() || Topic_ == topic); + Y_VERIFY(Partition_.Empty() || Partition_ == partition); + + if (Topic_.Empty()) { + Topic_ = topic; + Partition_ = partition; + } + + HasWriteOperations_ = true; +} + +void TTopicPartitionOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TKqpTransaction> &txs) +{ + Y_VERIFY(TabletId_.Defined()); + Y_VERIFY(Partition_.Defined()); + + auto& tx = txs[*TabletId_]; + + for (auto& [consumer, operations] : Operations_) { + NKikimrPQ::TPartitionOperation* o = tx.MutableOperations()->Add(); + o->SetPartitionId(*Partition_); + auto [begin, end] = operations.GetRange(); + o->SetBegin(begin); + o->SetEnd(end); + o->SetConsumer(consumer); + o->SetPath(*Topic_); + } + + if (HasWriteOperations_) { + NKikimrPQ::TPartitionOperation* o = tx.MutableOperations()->Add(); + o->SetPartitionId(*Partition_); + o->SetPath(*Topic_); } - return &p->second; } -void TTopic::Merge(const TTopic& rhs) +void TTopicPartitionOperations::Merge(const TTopicPartitionOperations& rhs) { - for (auto& [name, partition] : rhs.Partitions_) { - Partitions_[name].Merge(partition); + Y_VERIFY(Topic_.Empty() || Topic_ == rhs.Topic_); + Y_VERIFY(Partition_.Empty() || Partition_ == rhs.Partition_); + Y_VERIFY(TabletId_.Empty() || TabletId_ == rhs.TabletId_); + + if (Topic_.Empty()) { + Topic_ = rhs.Topic_; + Partition_ = rhs.Partition_; + TabletId_ = rhs.TabletId_; + } + + for (auto& [key, value] : rhs.Operations_) { + Operations_[key].Merge(value); } + + HasWriteOperations_ |= rhs.HasWriteOperations_; +} + +ui64 TTopicPartitionOperations::GetTabletId() const +{ + Y_VERIFY(TabletId_.Defined()); + + return *TabletId_; +} + +void TTopicPartitionOperations::SetTabletId(ui64 value) +{ + Y_VERIFY(TabletId_.Empty()); + + TabletId_ = value; +} + +bool TTopicPartitionOperations::HasReadOperations() const +{ + return !Operations_.empty(); +} + +bool TTopicPartitionOperations::HasWriteOperations() const +{ + return HasWriteOperations_; +} + +// +// TTopicPartition +// +TTopicPartition::TTopicPartition(TString topic, ui32 partition) : + Topic_{std::move(topic)}, + Partition_{partition} +{ +} + +bool TTopicPartition::operator==(const TTopicPartition& x) const +{ + return (Topic_ == x.Topic_) && (Partition_ == x.Partition_); +} + +size_t TTopicPartition::THash::operator()(const TTopicPartition& x) const +{ + size_t hash = std::hash<TString>{}(x.Topic_); + hash = CombineHashes(hash, std::hash<ui32>{}(x.Partition_)); + return hash; } // -// TOffsetsInfo +// TTopicOperations // -TTopic& TOffsetsInfo::AddTopic(const TString &path) +bool TTopicOperations::IsValid() const { - return Topics_[path]; + return std::all_of(Operations_.begin(), Operations_.end(), + [](auto& x) { return x.second.IsValid(); }); } -TTopic *TOffsetsInfo::GetTopic(const TString &path) +bool TTopicOperations::HasOperations() const { - auto p = Topics_.find(path); - if (p == Topics_.end()) { - return nullptr; + return HasReadOperations() || HasWriteOperations(); +} + +bool TTopicOperations::HasReadOperations() const +{ + return HasReadOperations_; +} + +bool TTopicOperations::HasWriteOperations() const +{ + return HasWriteOperations_; +} + +bool TTopicOperations::TabletHasReadOperations(ui64 tabletId) const +{ + for (auto& [_, value] : Operations_) { + if (value.GetTabletId() == tabletId) { + return value.HasReadOperations(); + } } - return &p->second; + return false; +} + +void TTopicOperations::AddOperation(const TString& topic, ui32 partition, + const TString& consumer, + const Ydb::Topic::OffsetsRange& range) +{ + TTopicPartition key{topic, partition}; + Operations_[key].AddOperation(topic, partition, + consumer, + range); + HasReadOperations_ = true; +} + +void TTopicOperations::AddOperation(const TString& topic, ui32 partition) +{ + TTopicPartition key{topic, partition}; + Operations_[key].AddOperation(topic, partition); + HasWriteOperations_ = true; } -void TOffsetsInfo::FillSchemeCacheNavigate(NSchemeCache::TSchemeCacheNavigate& navigate) const +void TTopicOperations::FillSchemeCacheNavigate(NSchemeCache::TSchemeCacheNavigate& navigate, + TMaybe<TString> consumer) { - for (auto& [topic, _] : Topics_) { + TSet<TString> topics; + for (auto& [key, _] : Operations_) { + topics.insert(key.Topic_); + } + + for (auto& topic : topics) { NSchemeCache::TSchemeCacheNavigate::TEntry entry; entry.Path = NKikimr::SplitPath(topic); entry.SyncVersion = true; @@ -86,17 +245,17 @@ void TOffsetsInfo::FillSchemeCacheNavigate(NSchemeCache::TSchemeCacheNavigate& n navigate.ResultSet.push_back(std::move(entry)); } + + Consumer_ = std::move(consumer); } -bool TOffsetsInfo::ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCacheNavigate::TResultSet& results, - Ydb::StatusIds_StatusCode& status, - TString& message) +bool TTopicOperations::ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCacheNavigate::TResultSet& results, + Ydb::StatusIds_StatusCode& status, + TString& message) { - Y_VERIFY(results.size() == Topics_.size()); - if (results.empty()) { status = Ydb::StatusIds::BAD_REQUEST; - message = "empty request"; + message = "Request is empty"; return false; } @@ -116,24 +275,42 @@ bool TOffsetsInfo::ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCacheNa const NKikimrSchemeOp::TPersQueueGroupDescription& description = result.PQGroupInfo->Description; - TTopic* topic = GetTopic(CanonizePath(result.Path)); + if (Consumer_) { + bool found = false; - // - // TODO: если топика нет, то отправить сообщение с ошибкой SCHEME_ERROR - // - Y_VERIFY(topic != nullptr); + for (auto& consumer : description.GetPQTabletConfig().GetReadRules()) { + if (Consumer_ == consumer) { + found = true; + break; + } + } + + if (!found) { + builder << "Unknown consumer '" << *Consumer_ << "'"; - for (auto& p : description.GetPartitions()) { - if (auto partition = topic->GetPartition(p.GetPartitionId()); partition) { - LOG_D("topic=" << description.GetName() - << ", partition=" << p.GetPartitionId() - << ", tabletId=" << p.GetTabletId()); + status = Ydb::StatusIds::BAD_REQUEST; + message = std::move(builder); - partition->SetTabletId(p.GetTabletId()); + return false; + } + } + + TString path = CanonizePath(result.Path); + + for (auto& partition : description.GetPartitions()) { + TTopicPartition key{path, partition.GetPartitionId()}; + + if (auto p = Operations_.find(key); p != Operations_.end()) { + LOG_D("(topic, partition, tablet): " + << "'" << key.Topic_ << "'" + << ", " << partition.GetPartitionId() + << ", " << partition.GetTabletId()); + + p->second.SetTabletId(partition.GetTabletId()); } } } else { - builder << "The '" << JoinPath(result.Path) << "' topic is missing"; + builder << "Topic '" << JoinPath(result.Path) << "' is missing"; status = Ydb::StatusIds::SCHEME_ERROR; message = std::move(builder); @@ -148,11 +325,46 @@ bool TOffsetsInfo::ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCacheNa return true; } -void TOffsetsInfo::Merge(const TOffsetsInfo& rhs) +void TTopicOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TKqpTransaction> &txs) +{ + for (auto& [_, operations] : Operations_) { + operations.BuildTopicTxs(txs); + } +} + +void TTopicOperations::Merge(const TTopicOperations& rhs) { - for (auto& [name, topic] : rhs.Topics_) { - Topics_[name].Merge(topic); + for (auto& [key, value] : rhs.Operations_) { + Operations_[key].Merge(value); } + + HasReadOperations_ |= rhs.HasReadOperations_; + HasWriteOperations_ |= rhs.HasWriteOperations_; +} + +TSet<ui64> TTopicOperations::GetReceivingTabletIds() const +{ + TSet<ui64> ids; + for (auto& [_, operations] : Operations_) { + if (operations.HasWriteOperations()) { + ids.insert(operations.GetTabletId()); + } + } + return ids; +} + +TSet<ui64> TTopicOperations::GetSendingTabletIds() const +{ + TSet<ui64> ids; + for (auto& [_, operations] : Operations_) { + ids.insert(operations.GetTabletId()); + } + return ids; +} + +size_t TTopicOperations::GetSize() const +{ + return Operations_.size(); } } diff --git a/ydb/core/kqp/common/kqp_topic.h b/ydb/core/kqp/common/kqp_topic.h index e319c441dc7..70219893655 100644 --- a/ydb/core/kqp/common/kqp_topic.h +++ b/ydb/core/kqp/common/kqp_topic.h @@ -2,6 +2,7 @@ #include <ydb/public/api/protos/ydb_status_codes.pb.h> #include <ydb/public/api/protos/ydb_topic.pb.h> +#include <ydb/core/protos/pqconfig.pb.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> @@ -15,47 +16,107 @@ #include <library/cpp/containers/disjoint_interval_tree/disjoint_interval_tree.h> -#include <vector> - namespace NKikimr::NKqp::NTopic { class TOffsetsRangeIntersectExpection : public yexception { }; -struct TPartition { - void AddRange(const Ydb::Topic::OffsetsRange &range); - void SetTabletId(ui64 value); +class TConsumerOperations { +public: + bool IsValid() const; + + std::pair<ui64, ui64> GetRange() const; - void Merge(const TPartition& rhs); + ui64 GetBegin() const; + ui64 GetEnd() const; + + void AddOperation(const TString& consumer, const Ydb::Topic::OffsetsRange& range); + void Merge(const TConsumerOperations& rhs); private: - void AddRangeImpl(ui64 begin, ui64 end); + void AddOperationImpl(const TString& consumer, + ui64 begin, ui64 end); + TMaybe<TString> Consumer_; TDisjointIntervalTree<ui64> Offsets_; - ui64 TabletId_ = Max<ui64>(); }; -struct TTopic { - TPartition& AddPartition(ui32 id); - TPartition* GetPartition(ui32 id); +class TTopicPartitionOperations { +public: + bool IsValid() const; + + void AddOperation(const TString& topic, ui32 partition, + const TString& consumer, + const Ydb::Topic::OffsetsRange& range); + void AddOperation(const TString& topic, ui32 partition); + + void BuildTopicTxs(THashMap<ui64, NKikimrPQ::TKqpTransaction> &txs); - void Merge(const TTopic& rhs); + void Merge(const TTopicPartitionOperations& rhs); - THashMap<ui32, TPartition> Partitions_; + void SetTabletId(ui64 value); + ui64 GetTabletId() const; + + bool HasReadOperations() const; + bool HasWriteOperations() const; + +private: + TMaybe<TString> Topic_; + TMaybe<ui32> Partition_; + THashMap<TString, TConsumerOperations> Operations_; + bool HasWriteOperations_ = false; + TMaybe<ui64> TabletId_; }; -struct TOffsetsInfo { - TTopic& AddTopic(const TString &path); - TTopic* GetTopic(const TString &path); +struct TTopicPartition { + struct THash { + size_t operator()(const TTopicPartition& x) const; + }; + + TTopicPartition(TString topic, ui32 partition); + + bool operator==(const TTopicPartition& x) const; - void FillSchemeCacheNavigate(NSchemeCache::TSchemeCacheNavigate& navigate) const; + TString Topic_; + ui32 Partition_; +}; + +class TTopicOperations { +public: + bool IsValid() const; + + bool HasOperations() const; + bool HasReadOperations() const; + bool HasWriteOperations() const; + + bool TabletHasReadOperations(ui64 tabletId) const; + + void AddOperation(const TString& topic, ui32 partition, + const TString& consumer, + const Ydb::Topic::OffsetsRange& range); + void AddOperation(const TString& topic, ui32 partition); + + void FillSchemeCacheNavigate(NSchemeCache::TSchemeCacheNavigate& navigate, + TMaybe<TString> consumer); bool ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCacheNavigate::TResultSet& results, Ydb::StatusIds_StatusCode& status, TString& message); - void Merge(const TOffsetsInfo& rhs); + void BuildTopicTxs(THashMap<ui64, NKikimrPQ::TKqpTransaction> &txs); + + void Merge(const TTopicOperations& rhs); + + TSet<ui64> GetReceivingTabletIds() const; + TSet<ui64> GetSendingTabletIds() const; + + size_t GetSize() const; + +private: + THashMap<TTopicPartition, TTopicPartitionOperations, TTopicPartition::THash> Operations_; + bool HasReadOperations_ = false; + bool HasWriteOperations_ = false; - THashMap<TString, TTopic> Topics_; + TMaybe<TString> Consumer_; }; } diff --git a/ydb/core/kqp/common/kqp_transform.h b/ydb/core/kqp/common/kqp_transform.h index 93e85ab7b33..a9b1face3b0 100644 --- a/ydb/core/kqp/common/kqp_transform.h +++ b/ydb/core/kqp/common/kqp_transform.h @@ -241,10 +241,6 @@ public: ForceNewEngineSettings.ForceNewEngineLevel = level; } - void MergeTopicOffsets(const NTopic::TOffsetsInfo &offsets) { - TopicOffsets.Merge(offsets); - } - public: struct TParamsState : public TThrRefBase { TParamValueMap Values; @@ -265,7 +261,7 @@ public: TKqpTxLocks Locks; TDeferredEffects DeferredEffects; - NTopic::TOffsetsInfo TopicOffsets; + NTopic::TTopicOperations TopicOperations; TIntrusivePtr<TParamsState> ParamsState; IKqpGateway::TKqpSnapshotHandle SnapshotHandle; diff --git a/ydb/core/kqp/executer/kqp_data_executer.cpp b/ydb/core/kqp/executer/kqp_data_executer.cpp index 07135d24a18..d6255e721a1 100644 --- a/ydb/core/kqp/executer/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer/kqp_data_executer.cpp @@ -22,6 +22,7 @@ #include <ydb/core/tx/long_tx_service/public/events.h> #include <ydb/core/tx/long_tx_service/public/lock_handle.h> #include <ydb/core/tx/tx_proxy/proxy.h> +#include <ydb/core/persqueue/events/global.h> #include <ydb/library/yql/dq/runtime/dq_columns_resolve.h> #include <ydb/library/yql/dq/tasks/dq_connection_builder.h> @@ -196,6 +197,7 @@ private: hFunc(TEvDataShard::TEvProposeTransactionResult, HandlePrepare); hFunc(TEvDataShard::TEvProposeTransactionRestart, HandleExecute); hFunc(TEvDataShard::TEvProposeTransactionAttachResult, HandlePrepare); + hFunc(TEvPersQueue::TEvProposeTransactionResult, HandlePrepare); hFunc(TEvPrivate::TEvReattachToShard, HandleExecute); hFunc(TEvDqCompute::TEvState, HandlePrepare); // from CA hFunc(TEvDqCompute::TEvChannelData, HandleExecute); // from CA @@ -214,6 +216,21 @@ private: ReportEventElapsedTime(); } + void HandlePrepare(TEvPersQueue::TEvProposeTransactionResult::TPtr& ev) { + NKikimrPQ::TEvProposeTransactionResult& event = ev->Get()->Record; + + LOG_D("Got propose result, topic tablet: " << event.GetOrigin() << ", status: " + << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(event.GetStatus())); + + TShardState *state = TopicTabletStates.FindPtr(event.GetOrigin()); + YQL_ENSURE(state); + + YQL_ENSURE(event.GetStatus() == NKikimrPQ::TEvProposeTransactionResult::ERROR); + + auto issue = YqlIssue({}, TIssuesIds::KIKIMR_OPERATION_ABORTED); + ReplyErrorAndDie(Ydb::StatusIds::ABORTED, issue); + } + void HandlePrepare(TEvDataShard::TEvProposeTransactionResult::TPtr& ev) { TEvDataShard::TEvProposeTransactionResult* res = ev->Get(); const ui64 shardId = res->GetOrigin(); @@ -529,6 +546,10 @@ private: auto& affectedSet = *transaction.MutableAffectedSet(); affectedSet.Reserve(static_cast<int>(ShardStates.size())); + // + // TODO(abcdef): учесть таблетки топиков + // + ui64 aggrMinStep = 0; ui64 aggrMaxStep = Max<ui64>(); ui64 totalReadSize = 0; @@ -564,6 +585,10 @@ private: } } + // + // TODO(abcdef): учесть таблетки топиков + // + item.SetFlags(affectedFlags); } @@ -597,6 +622,7 @@ private: hFunc(TEvDataShard::TEvProposeTransactionResult, HandleExecute); hFunc(TEvDataShard::TEvProposeTransactionRestart, HandleExecute); hFunc(TEvDataShard::TEvProposeTransactionAttachResult, HandleExecute); + hFunc(TEvPersQueue::TEvProposeTransactionResult, HandleExecute); hFunc(TEvPrivate::TEvReattachToShard, HandleExecute); hFunc(TEvPipeCache::TEvDeliveryProblem, HandleExecute); hFunc(TEvTxProxy::TEvProposeTransactionStatus, HandleExecute); @@ -613,6 +639,21 @@ private: ReportEventElapsedTime(); } + void HandleExecute(TEvPersQueue::TEvProposeTransactionResult::TPtr& ev) { + NKikimrPQ::TEvProposeTransactionResult& event = ev->Get()->Record; + + LOG_D("Got propose result, topic tablet: " << event.GetOrigin() << ", status: " + << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(event.GetStatus())); + + TShardState *state = TopicTabletStates.FindPtr(event.GetOrigin()); + YQL_ENSURE(state); + + YQL_ENSURE(event.GetStatus() == NKikimrPQ::TEvProposeTransactionResult::ERROR); + + auto issue = YqlIssue({}, TIssuesIds::KIKIMR_OPERATION_ABORTED); + ReplyErrorAndDie(Ydb::StatusIds::ABORTED, issue); + } + void HandleExecute(TEvDataShard::TEvProposeTransactionResult::TPtr& ev) { TEvDataShard::TEvProposeTransactionResult* res = ev->Get(); const ui64 shardId = res->GetOrigin(); @@ -932,6 +973,12 @@ private: LOG_D("Datashard " << x.first << " not finished yet: " << ToString(x.second.State)); } } + for (auto& x : TopicTabletStates) { + if (x.second.State != TShardState::EState::Finished) { + ++notFinished; + LOG_D("TopicTablet " << x.first << " not finished yet: " << ToString(x.second.State)); + } + } if (notFinished == 0 && PendingComputeActors.empty()) { Finalize(); return; @@ -948,6 +995,11 @@ private: sb << "DS " << shardId << " (" << ToString(shardState.State) << "), "; } } + for (auto& [tabletId, tabletState] : TopicTabletStates) { + if (tabletState.State != TShardState::EState::Finished) { + sb << "PQ " << tabletId << " (" << ToString(tabletState.State) << "), "; + } + } LOG_D(sb); } } @@ -1308,7 +1360,7 @@ private: LWTRACK(KqpDataExecuterStartExecute, ResponseEv->Orbit, TxId); RequestControls.Reqister(TlsActivationContext->AsActorContext()); - ReadOnlyTx = true; + ReadOnlyTx = !Request.TopicOperations.HasOperations(); auto& funcRegistry = *AppData()->FunctionRegistry; NMiniKQL::TScopedAlloc alloc(TAlignedPagePoolCounters(), funcRegistry.SupportsSizedAllocators()); @@ -1507,10 +1559,12 @@ private: return; } - auto datashardTxs = BuildDatashardTxs(datashardTasks); + TTopicTabletTxs topicTxs; + auto datashardTxs = BuildDatashardTxs(datashardTasks, topicTxs); // Single-shard transactions are always immediate - ImmediateTx = datashardTxs.size() <= 1; + ImmediateTx = (datashardTxs.size() + Request.TopicOperations.GetSize()) <= 1; + switch (Request.IsolationLevel) { // OnlineRO with AllowInconsistentReads = true case NKikimrKqp::ISOLATION_LEVEL_READ_UNCOMMITTED: @@ -1541,6 +1595,7 @@ private: if (forceSnapshot) { ComputeTasks = std::move(computeTasks); DatashardTxs = std::move(datashardTxs); + TopicTxs = std::move(topicTxs); auto longTxService = NLongTxService::MakeLongTxServiceID(SelfId().NodeId()); Send(longTxService, new NLongTxService::TEvLongTxService::TEvAcquireReadSnapshot(Database)); @@ -1558,7 +1613,8 @@ private: if (prepareTasksSpan) { prepareTasksSpan.End(); } - ContinueExecute(computeTasks, datashardTxs); + + ContinueExecute(computeTasks, datashardTxs, topicTxs); } STATEFN(WaitSnapshotState) { @@ -1589,15 +1645,21 @@ private: auto computeTasks = std::move(ComputeTasks); auto datashardTxs = std::move(DatashardTxs); - ContinueExecute(computeTasks, datashardTxs); + auto topicTxs = std::move(TopicTxs); + + ContinueExecute(computeTasks, datashardTxs, topicTxs); } - void ContinueExecute( - TVector<NDqProto::TDqTask>& computeTasks, - THashMap<ui64, NKikimrTxDataShard::TKqpTransaction>& datashardTxs) + using TDatashardTxs = THashMap<ui64, NKikimrTxDataShard::TKqpTransaction>; + using TTopicTabletTxs = THashMap<ui64, NKikimrPQ::TKqpTransaction>; + + void ContinueExecute(TVector<NDqProto::TDqTask>& computeTasks, + TDatashardTxs& datashardTxs, + TTopicTabletTxs& topicTxs) { UseFollowers = Request.IsolationLevel == NKikimrKqp::ISOLATION_LEVEL_READ_STALE; - if (datashardTxs.size() > 1) { + + if (!ImmediateTx) { // Followers only allowed for single shard transactions. // (legacy behaviour, for compatibility with current execution engine) UseFollowers = false; @@ -1616,7 +1678,7 @@ private: //Stats->ComputeStats.reserve(computeTasks.size()); } - Execute(computeTasks, datashardTxs); + Execute(computeTasks, datashardTxs, topicTxs); if (ImmediateTx) { LOG_T("Immediate tx, become ExecuteState"); @@ -1635,8 +1697,9 @@ private: } } - THashMap<ui64, NKikimrTxDataShard::TKqpTransaction> BuildDatashardTxs(const THashMap<ui64, TVector<NDqProto::TDqTask>>& datashardTasks) { - THashMap<ui64, NKikimrTxDataShard::TKqpTransaction> datashardTxs; + TDatashardTxs BuildDatashardTxs(const THashMap<ui64, TVector<NDqProto::TDqTask>>& datashardTasks, + TTopicTabletTxs& topicTxs) { + TDatashardTxs datashardTxs; for (auto& [shardId, tasks]: datashardTasks) { auto& dsTxs = datashardTxs[shardId]; @@ -1645,8 +1708,11 @@ private: } } - if (auto locksMap = ExtractLocks(Request.Locks); !locksMap.empty()) { + Request.TopicOperations.BuildTopicTxs(topicTxs); + + if (auto locksMap = ExtractLocks(Request.Locks); !locksMap.empty() || Request.TopicOperations.HasReadOperations()) { YQL_ENSURE(Request.ValidateLocks || Request.EraseLocks); + auto locksOp = Request.ValidateLocks && Request.EraseLocks ? NKikimrTxDataShard::TKqpLocks::Commit : (Request.ValidateLocks @@ -1660,6 +1726,8 @@ private: taskShardIds.insert(shardId); } } + + taskShardIds.merge(Request.TopicOperations.GetReceivingTabletIds()); } TSet<ui64> locksSendingShards; @@ -1671,26 +1739,48 @@ private: tx.MutableLocks()->MutableLocks()->Add()->Swap(&lock); } - if (!locksList.empty() && Request.ValidateLocks) { + if ((!locksList.empty() || Request.TopicOperations.HasReadOperations()) && Request.ValidateLocks) { locksSendingShards.insert(shardId); } } + locksSendingShards.merge(Request.TopicOperations.GetSendingTabletIds()); + if (Request.ValidateLocks) { NProtoBuf::RepeatedField<ui64> sendingShards(locksSendingShards.begin(), locksSendingShards.end()); NProtoBuf::RepeatedField<ui64> receivingShards(taskShardIds.begin(), taskShardIds.end()); + for (auto& [shardId, shardTx] : datashardTxs) { shardTx.MutableLocks()->SetOp(locksOp); shardTx.MutableLocks()->MutableSendingShards()->CopyFrom(sendingShards); shardTx.MutableLocks()->MutableReceivingShards()->CopyFrom(receivingShards); } + + for (auto& [_, tx] : topicTxs) { + switch (locksOp) { + case NKikimrTxDataShard::TKqpLocks::Commit: + tx.SetOp(NKikimrPQ::TKqpTransaction::Commit); + break; + case NKikimrTxDataShard::TKqpLocks::Validate: + tx.SetOp(NKikimrPQ::TKqpTransaction::Validate); + break; + case NKikimrTxDataShard::TKqpLocks::Rollback: + tx.SetOp(NKikimrPQ::TKqpTransaction::Rollback); + break; + default: + break; + } + + tx.MutableSendingShards()->CopyFrom(sendingShards); + tx.MutableReceivingShards()->CopyFrom(receivingShards); + } } } return datashardTxs; } - void Execute(TVector<NDqProto::TDqTask>& computeTasks, THashMap<ui64, NKikimrTxDataShard::TKqpTransaction>& datashardTxs) { + void Execute(TVector<NDqProto::TDqTask>& computeTasks, TDatashardTxs& datashardTxs, TTopicTabletTxs& topicTxs) { auto lockTxId = Request.AcquireLocksTxId; if (lockTxId.Defined() && *lockTxId == 0) { lockTxId = TxId; @@ -1756,6 +1846,8 @@ private: ExecuteDatashardTransaction(shardId, shardTx, lockTxId); } + ExecuteTopicTabletTransactions(topicTxs); + if (sendTasksSpan) { sendTasksSpan.End(); } @@ -1763,6 +1855,7 @@ private: LOG_I("Total tasks: " << TasksGraph.GetTasks().size() << ", readonly: " << ReadOnlyTx << ", datashardTxs: " << datashardTxs.size() + << ", topicTxs: " << Request.TopicOperations.GetSize() << ", immediate: " << ImmediateTx << ", useFollowers: " << UseFollowers); @@ -1773,9 +1866,56 @@ private: CollectTaskChannelsUpdates(task, updates); } PropagateChannelsUpdates(updates); + CheckExecutionComplete(); } + void ExecuteTopicTabletTransactions(TTopicTabletTxs& topicTxs) { + auto lockTxId = Request.AcquireLocksTxId; + if (lockTxId.Defined() && *lockTxId == 0) { + lockTxId = TxId; + } + + for (auto& tx : topicTxs) { + auto tabletId = tx.first; + auto& transaction = tx.second; + + auto ev = std::make_unique<TEvPersQueue::TEvProposeTransaction>(); + + if (lockTxId) { + transaction.SetLockTxId(*lockTxId); + } + transaction.SetImmediate(ImmediateTx); + + ActorIdToProto(SelfId(), ev->Record.MutableSource()); + ev->Record.MutableTxBody()->Swap(&transaction); + ev->Record.SetTxId(TxId); + + auto traceId = ExecuterSpan.GetTraceId(); + LOG_D("ExecuteTopicTabletTransaction traceId.verbosity: " << std::to_string(traceId.GetVerbosity())); + + LOG_D("Executing KQP transaction on topic tablet: " << tabletId + << ", lockTxId: " << lockTxId); + + Send(MakePipePeNodeCacheID(UseFollowers), + new TEvPipeCache::TEvForward(ev.release(), tabletId, true), + 0, + 0, + std::move(traceId)); + + TShardState state; + state.State = + ImmediateTx ? TShardState::EState::Executing : TShardState::EState::Preparing; + state.DatashardState.ConstructInPlace(); + state.DatashardState->Follower = UseFollowers; + + state.DatashardState->ShardReadLocks = Request.TopicOperations.TabletHasReadOperations(tabletId); + + auto result = TopicTabletStates.emplace(tabletId, std::move(state)); + YQL_ENSURE(result.second); + } + } + void Finalize() { auto& response = *ResponseEv->Record.MutableResponse(); @@ -1929,6 +2069,7 @@ private: NTxProxy::TRequestControls RequestControls; ui64 TxCoordinator = 0; THashMap<ui64, TShardState> ShardStates; + THashMap<ui64, TShardState> TopicTabletStates; TVector<NKikimrTxDataShard::TLock> Locks; TVector<TKqpExecuterTxResult> Results; bool ReadOnlyTx = true; @@ -1948,7 +2089,8 @@ private: // Temporary storage during snapshot acquisition TVector<NDqProto::TDqTask> ComputeTasks; - THashMap<ui64, NKikimrTxDataShard::TKqpTransaction> DatashardTxs; + TDatashardTxs DatashardTxs; + TTopicTabletTxs TopicTxs; // Lock handle for a newly acquired lock TLockHandle LockHandle; diff --git a/ydb/core/kqp/executer/kqp_executer_impl.h b/ydb/core/kqp/executer/kqp_executer_impl.h index 250299766d2..583bcd6011c 100644 --- a/ydb/core/kqp/executer/kqp_executer_impl.h +++ b/ydb/core/kqp/executer/kqp_executer_impl.h @@ -13,6 +13,7 @@ #include <ydb/core/base/wilson.h> #include <ydb/core/base/kikimr_issue.h> #include <ydb/core/protos/tx_datashard.pb.h> +#include <ydb/core/protos/pqconfig.pb.h> #include <ydb/core/kqp/executer/kqp_tasks_graph.h> #include <ydb/core/kqp/kqp.h> #include <ydb/core/grpc_services/local_rate_limiter.h> diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index 37916452c8b..28ea418d34e 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -98,7 +98,7 @@ struct TKqpQueryState { TString TxId_Human = ""; bool Commit = false; - NTopic::TOffsetsInfo Offsets; + NTopic::TTopicOperations TopicOperations; }; struct TKqpCleanupCtx { @@ -424,24 +424,40 @@ public: return; } - QueryState->Offsets = NTopic::TOffsetsInfo(); + YQL_ENSURE(QueryState->Request.HasTopicOperations()); - for (auto& topic : QueryState->Request.topics()) { - auto path = CanonizePath(NPersQueue::GetFullTopicPath(ctx, QueryState->Request.GetDatabase(), topic.path())); - auto& t = QueryState->Offsets.AddTopic(path); + const NKikimrKqp::TTopicOperations& operations = QueryState->Request.GetTopicOperations(); - for (auto& partition : topic.partitions()) { - auto& p = t.AddPartition(partition.partition_id()); + TMaybe<TString> consumer; + if (operations.HasConsumer()) { + consumer = operations.GetConsumer(); + } + + QueryState->TopicOperations = NTopic::TTopicOperations(); + + for (auto& topic : operations.GetTopics()) { + auto path = + CanonizePath(NPersQueue::GetFullTopicPath(ctx, QueryState->Request.GetDatabase(), topic.path())); - for (auto& range : partition.partition_offsets()) { - p.AddRange(range); + for (auto& partition : topic.partitions()) { + if (partition.partition_offsets().empty()) { + QueryState->TopicOperations.AddOperation(path, partition.partition_id()); + } else { + for (auto& range : partition.partition_offsets()) { + YQL_ENSURE(consumer.Defined()); + + QueryState->TopicOperations.AddOperation(path, partition.partition_id(), + *consumer, + range); + } } } } auto navigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>(); navigate->DatabaseName = CanonizePath(QueryState->Request.GetDatabase()); - QueryState->Offsets.FillSchemeCacheNavigate(*navigate); + QueryState->TopicOperations.FillSchemeCacheNavigate(*navigate, + std::move(consumer)); navigate->UserToken = new NACLib::TUserToken(QueryState->UserToken); Become(&TKqpSessionActor::TopicOpsState); @@ -961,6 +977,23 @@ public: return true; } + bool CheckTopicOperations() { + auto& txCtx = *QueryState->TxCtx; + + if (txCtx.TopicOperations.IsValid()) { + return true; + } + + auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); + std::vector<TIssue> issues { + YqlIssue({}, TIssuesIds::KIKIMR_BAD_REQUEST, "Incorrect offset ranges in the transaction.") + }; + ReplyQueryError(requestInfo, Ydb::StatusIds::ABORTED, "incorrect offset ranges in the tx", + MessageFromIssues(issues)); + + return false; + } + void ExecuteOrDefer() { if (!(QueryState->PreparedQuery && QueryState->CurrentTx < QueryState->PreparedQuery->GetPhysicalQuery().TransactionsSize())) @@ -991,7 +1024,7 @@ public: break; } } - if (!CheckTransacionLocks()) { + if (!CheckTransacionLocks() || !CheckTopicOperations()) { return; } @@ -1014,6 +1047,10 @@ public: LOG_D("ExecutePhyTx, tx: " << (void*)tx.get() << " commit: " << commit << " txCtx.DeferredEffects.size(): " << txCtx.DeferredEffects.Size()); + if (!CheckTopicOperations()) { + return true; + } + // TODO Handle timeouts -- request.Timeout, request.CancelAfter if (tx) { @@ -1029,7 +1066,8 @@ public: request.Transactions.emplace_back(tx, PrepareParameters(*tx)); } else { YQL_ENSURE(commit); - if (txCtx.DeferredEffects.Empty() && !txCtx.Locks.HasLocks()) { + + if (txCtx.DeferredEffects.Empty() && !txCtx.Locks.HasLocks() && !txCtx.TopicOperations.HasOperations()) { ReplySuccess(); return true; } @@ -1039,6 +1077,7 @@ public: for (const auto& effect : txCtx.DeferredEffects) { YQL_ENSURE(!effect.Node); YQL_ENSURE(effect.PhysicalTx->GetType() == NKqpProto::TKqpPhyTx::TYPE_DATA); + request.Transactions.emplace_back(effect.PhysicalTx, GetParamsRefMap(effect.Params)); LOG_D("TExecPhysicalRequest, add DeferredEffect to Transaction," @@ -1049,9 +1088,11 @@ public: request.PerShardKeysSizeLimitBytes = Config->_CommitPerShardKeysSizeLimitBytes.Get().GetRef(); } - if (txCtx.Locks.HasLocks()) { - request.ValidateLocks = !(txCtx.GetSnapshot().IsValid() && txCtx.DeferredEffects.Empty()); + if (txCtx.Locks.HasLocks() || txCtx.TopicOperations.HasReadOperations()) { + request.ValidateLocks = !(txCtx.GetSnapshot().IsValid() && txCtx.DeferredEffects.Empty()) || + txCtx.TopicOperations.HasReadOperations(); request.EraseLocks = true; + LOG_D("TExecPhysicalRequest, tx has locks, ValidateLocks: " << request.ValidateLocks << " EraseLocks: " << request.EraseLocks); @@ -1059,6 +1100,8 @@ public: request.Locks.emplace_back(lock.GetValueRef(txCtx.Locks.LockType)); } } + + request.TopicOperations = std::move(txCtx.TopicOperations); } else if (ShouldAcquireLocks(query)) { request.AcquireLocksTxId = txCtx.Locks.GetLockTxId(); } @@ -1069,7 +1112,9 @@ public: request.Transactions.size(), request.Locks.size(), request.AcquireLocksTxId.Defined()); + SendToExecuter(std::move(request)); + return false; } @@ -2081,14 +2126,14 @@ private: ythrow TRequestFail(requestInfo, Ydb::StatusIds::SCHEME_ERROR) << message; } - QueryState->Offsets.ProcessSchemeCacheNavigate(response->ResultSet, - status, - message); + QueryState->TopicOperations.ProcessSchemeCacheNavigate(response->ResultSet, + status, + message); if (status != Ydb::StatusIds::SUCCESS) { ythrow TRequestFail(requestInfo, status) << message; } - if (!TryMergeTopicOffsets(QueryState->Offsets, message)) { + if (!TryMergeTopicOffsets(QueryState->TopicOperations, message)) { ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST) << message; } @@ -2145,10 +2190,10 @@ private: return true; } - bool TryMergeTopicOffsets(const NTopic::TOffsetsInfo &offsets, TString& message) { + bool TryMergeTopicOffsets(const NTopic::TTopicOperations &operations, TString& message) { try { YQL_ENSURE(QueryState); - QueryState->TxCtx->MergeTopicOffsets(offsets); + QueryState->TxCtx->TopicOperations.Merge(operations); return true; } catch (const NTopic::TOffsetsRangeIntersectExpection &ex) { message = ex.what(); diff --git a/ydb/core/persqueue/events/global.h b/ydb/core/persqueue/events/global.h index e1957e7a16f..5ea9600c343 100644 --- a/ydb/core/persqueue/events/global.h +++ b/ydb/core/persqueue/events/global.h @@ -8,7 +8,6 @@ #include <ydb/core/protos/msgbus.pb.h> #include <ydb/public/api/protos/draft/persqueue_common.pb.h> - namespace NKikimr { struct TEvPersQueue { @@ -43,6 +42,9 @@ struct TEvPersQueue { EvGetPartitionIdForWrite, EvGetPartitionIdForWriteResponse, EvReportPartitionError, + EvProposeTransaction, + EvProposeTransactionResult, + EvCancelTransactionProposal, EvResponse = EvRequest + 256, EvInternalEvents = EvResponse + 256, EvEnd @@ -218,5 +220,14 @@ struct TEvPersQueue { {} }; + struct TEvProposeTransaction : public TEventPB<TEvProposeTransaction, NKikimrPQ::TEvProposeTransaction, EvProposeTransaction> { + }; + + struct TEvProposeTransactionResult : public TEventPB<TEvProposeTransactionResult, NKikimrPQ::TEvProposeTransactionResult, EvProposeTransactionResult> { + }; + + struct TEvCancelTransactionProposal : public TEventPB<TEvCancelTransactionProposal, NKikimrPQ::TEvCancelTransactionProposal, EvCancelTransactionProposal> { + }; + }; } //NKikimr diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index d242264ffc2..9160db6f6c7 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -2154,6 +2154,40 @@ void TPersQueue::HandleWakeup(const TActorContext& ctx) { ctx.Schedule(TDuration::Seconds(5), new TEvents::TEvWakeup()); } +void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx) +{ + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Handle TEvPersQueue::TEvProposeTransaction. topicPath=" << TopicPath); + + NKikimrPQ::TEvProposeTransaction& event = ev->Get()->Record; + const NKikimrPQ::TKqpTransaction& txBody = event.GetTxBody(); + + for (auto& operation : txBody.GetOperations()) { + Y_VERIFY(!operation.HasPath() || (operation.GetPath() == TopicPath)); + + bool isWriteOperation = !operation.HasBegin(); + + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, + "tx=" << event.GetTxId() << + ", lock_tx_id=" << txBody.GetLockTxId() << + ", path=" << operation.GetPath() << + ", partition=" << operation.GetPartitionId() << + ", consumer=" << operation.GetConsumer() << + ", begin=" << operation.GetBegin() << + ", end=" << operation.GetEnd() << + ", is_write=" << isWriteOperation); + } + + auto result = std::make_unique<TEvPersQueue::TEvProposeTransactionResult>(); + + result->Record.SetOrigin(TabletID()); + result->Record.SetStatus(NKikimrPQ::TEvProposeTransactionResult::ERROR); + result->Record.SetTxId(event.GetTxId()); + //result->Record.SetMinStep(); + //result->Record.SetMaxStep(); + //result->Record.SetStep(); + + ctx.Send(ActorIdFromProto(event.GetSource()), result.release()); +} bool TPersQueue::HandleHook(STFUNC_SIG) { @@ -2181,6 +2215,7 @@ bool TPersQueue::HandleHook(STFUNC_SIG) HFuncTraced(TEvPQ::TEvError, Handle); HFuncTraced(TEvPQ::TEvProxyResponse, Handle); CFunc(TEvents::TSystem::Wakeup, HandleWakeup); + HFuncTraced(TEvPersQueue::TEvProposeTransaction, Handle); default: return false; } diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h index 07ba60620c6..73fa4f3cd0b 100644 --- a/ydb/core/persqueue/pq_impl.h +++ b/ydb/core/persqueue/pq_impl.h @@ -34,6 +34,8 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { void HandleWakeup(const TActorContext&); + void Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext&); + void InitResponseBuilder(const ui64 responseCookie, const ui32 count, const ui32 counterId); void Handle(TEvPQ::TEvError::TPtr& ev, const TActorContext&); void Handle(TEvPQ::TEvProxyResponse::TPtr& ev, const TActorContext&); diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index bb647ed258c..836057d074d 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -73,6 +73,11 @@ message TKqlSettings { optional bool NewEngine = 6; }; +message TTopicOperations { + optional string Consumer = 1; + repeated Ydb.Topic.AddOffsetsToTransactionRequest.TopicOffsets Topics = 2; +} + message TQueryRequest { optional bytes SessionId = 1; optional string Query = 2; @@ -95,7 +100,7 @@ message TQueryRequest { reserved 19; // (deprecated) StatsMode optional NYql.NDqProto.EDqStatsMode StatsMode = 20; // deprecated optional Ydb.Table.QueryStatsCollection.Mode CollectStats = 21; - repeated Ydb.Topic.AddOffsetsToTransactionRequest.TopicOffsets Topics = 22; + optional TTopicOperations TopicOperations = 22; } message TKqpPathIdProto { diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index 466b72ef1b9..f4fb341596c 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -739,3 +739,56 @@ message TYdsShardIterator { required uint64 CreationTimestampMs = 6; optional ETopicKind Kind = 7; } + +message TPartitionOperation { + required uint64 PartitionId = 1; + optional uint64 Begin = 2; + optional uint64 End = 3; + optional string Consumer = 4; + required string Path = 5; // topic path +}; + +message TKqpTransaction { + enum ELocksOp { + Unspecified = 0; + Validate = 1; + Commit = 2; + Rollback = 3; + } + + repeated TPartitionOperation Operations = 1; + required ELocksOp Op = 2; + repeated uint64 SendingShards = 3; + repeated uint64 ReceivingShards = 4; + required bool Immediate = 5; + optional fixed64 LockTxId = 6; +} + +message TEvProposeTransaction { + required NActorsProto.TActorId Source = 1; + required TKqpTransaction TxBody = 2; + required uint64 TxId = 3; +}; + +message TEvProposeTransactionResult { + enum EStatus { + PREPARED = 1; + COMPLETE = 2; + ABORTED = 3; + ERROR = 4; + LOCKS_BROKEN = 5; + CANCELLED = 6; + BAD_REQUEST = 7; + }; + + required uint64 Origin = 1; // Tablet Id + required EStatus Status = 2; + required uint64 TxId = 3; + optional uint64 MinStep = 4; + optional uint64 MaxStep = 6; + optional uint64 Step = 7; +}; + +message TEvCancelTransactionProposal { + required uint64 TxId = 1; +}; diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto index 4501e3fee78..f310bf16eef 100644 --- a/ydb/public/api/protos/ydb_topic.proto +++ b/ydb/public/api/protos/ydb_topic.proto @@ -509,6 +509,8 @@ message AddOffsetsToTransactionRequest { // Ranges of offsets by topics. repeated TopicOffsets topics = 4; + string consumer = 5; + message TopicOffsets { // Topic path. string path = 1; diff --git a/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.cpp b/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.cpp index b6d1e4933b0..5c962458e80 100644 --- a/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.cpp +++ b/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.cpp @@ -29,9 +29,6 @@ void TAddOffsetsToTransactionActor::Proceed(const NActors::TActorContext& ctx) return Reply(Ydb::StatusIds::BAD_REQUEST, issues, ctx); } - // - // TODO: новый тип запроса. например, QUERY_TYPE_TOPIC - // ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_UNDEFINED); ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_TOPIC); @@ -57,11 +54,8 @@ void TAddOffsetsToTransactionActor::Proceed(const NActors::TActorContext& ctx) // ev->Record.MutableRequest()->MutableTxControl()->CopyFrom(req->tx_control()); - - // - // скопировать информацию о смещениях - // - *ev->Record.MutableRequest()->MutableTopics() = req->Gettopics(); + ev->Record.MutableRequest()->MutableTopicOperations()->SetConsumer(req->consumer()); + *ev->Record.MutableRequest()->MutableTopicOperations()->MutableTopics() = req->topics(); ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); } diff --git a/ydb/services/persqueue_v1/ut/topic_service_ut.cpp b/ydb/services/persqueue_v1/ut/topic_service_ut.cpp index 089a960073f..1390f3b7071 100644 --- a/ydb/services/persqueue_v1/ut/topic_service_ut.cpp +++ b/ydb/services/persqueue_v1/ut/topic_service_ut.cpp @@ -97,6 +97,7 @@ void AppendTopic(const TTopic &t, Ydb::Topic::AddOffsetsToTransactionRequest CreateRequest(const TString& session_id, const TString& tx_id, + const TString& consumer, const TVector<TTopic>& topics) { Ydb::Topic::AddOffsetsToTransactionRequest request; @@ -104,6 +105,8 @@ Ydb::Topic::AddOffsetsToTransactionRequest CreateRequest(const TString& session_ request.set_session_id(session_id); request.mutable_tx_control()->set_tx_id(tx_id); + request.set_consumer(consumer); + for (auto& t : topics) { AppendTopic(t, request.mutable_topics()); } @@ -140,10 +143,15 @@ protected: , NKikimrServices::TX_PROXY_SCHEME_CACHE , NKikimrServices::KQP_PROXY , NKikimrServices::PERSQUEUE + , NKikimrServices::KQP_EXECUTER , NKikimrServices::KQP_SESSION}, NActors::NLog::PRI_DEBUG); auto partsCount = 5u; - server->AnnoyingClient->CreateTopicNoLegacy(VALID_TOPIC_PATH, partsCount); + server->AnnoyingClient->CreateTopicNoLegacy(VALID_TOPIC_PATH, partsCount, + true, + true, + Nothing(), + {"c0nsumer", "consumer-1", "consumer-2"}); NACLib::TDiffACL acl; acl.AddAccess(NACLib::EAccessType::Allow, NACLib::DescribeSchema, AUTH_TOKEN); @@ -164,7 +172,8 @@ protected: stub = CreateTopicServiceTxStub(*server); } - Ydb::Topic::AddOffsetsToTransactionResponse CallAddOffsetsToTransaction(const TVector<TTopic>& topics) { + Ydb::Topic::AddOffsetsToTransactionResponse CallAddOffsetsToTransaction(const TVector<TTopic>& topics, + const TString& consumer = "c0nsumer") { grpc::ClientContext rcontext; rcontext.AddMetadata("x-ydb-auth-ticket", AUTH_TOKEN); rcontext.AddMetadata("x-ydb-database", DATABASE); @@ -172,7 +181,8 @@ protected: Ydb::Topic::AddOffsetsToTransactionResponse response; grpc::Status status = stub->AddOffsetsToTransaction(&rcontext, - CreateRequest(session->GetId(), tx->GetId(), topics), + CreateRequest(session->GetId(), tx->GetId(), + consumer, topics), &response); UNIT_ASSERT(status.ok()); @@ -204,7 +214,7 @@ protected: } }; -Y_UNIT_TEST_F(TheRangesDoNotOverlap, TAddOffsetToTransactionFixture) { +Y_UNIT_TEST_F(OneConsumer_TheRangesDoNotOverlap, TAddOffsetToTransactionFixture) { Ydb::Topic::AddOffsetsToTransactionResponse response = CallAddOffsetsToTransaction({ TTopic{.Path=VALID_TOPIC_PATH, .Partitions={ TPartition{.Id=4, .Offsets={ @@ -228,7 +238,7 @@ Y_UNIT_TEST_F(TheRangesDoNotOverlap, TAddOffsetToTransactionFixture) { UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); } -Y_UNIT_TEST_F(TheRangesOverlap, TAddOffsetToTransactionFixture) { +Y_UNIT_TEST_F(OneConsumer_TheRangesOverlap, TAddOffsetToTransactionFixture) { Ydb::Topic::AddOffsetsToTransactionResponse response = CallAddOffsetsToTransaction({ TTopic{.Path=VALID_TOPIC_PATH, .Partitions={ TPartition{.Id=4, .Offsets={ @@ -252,6 +262,41 @@ Y_UNIT_TEST_F(TheRangesOverlap, TAddOffsetToTransactionFixture) { UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::BAD_REQUEST); } +Y_UNIT_TEST_F(DifferentConsumers_TheRangesOverlap, TAddOffsetToTransactionFixture) { + Ydb::Topic::AddOffsetsToTransactionResponse response = CallAddOffsetsToTransaction({ + TTopic{.Path=VALID_TOPIC_PATH, .Partitions={ + TPartition{.Id=4, .Offsets={ + TOffsetRange{.Begin=1, .End=3}, + TOffsetRange{.Begin=5, .End=8} + }}, + TPartition{.Id=1, .Offsets={ + TOffsetRange{.Begin=2, .End=6} + }} + }} + }, "consumer-1"); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); + + response = CallAddOffsetsToTransaction({ + TTopic{.Path=VALID_TOPIC_PATH, .Partitions={ + TPartition{.Id=4, .Offsets={ + TOffsetRange{.Begin=4, .End=7} + }} + }} + }, "consumer-2"); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); +} + +Y_UNIT_TEST_F(UnknownConsumer, TAddOffsetToTransactionFixture) { + auto response = CallAddOffsetsToTransaction({ + TTopic{.Path=VALID_TOPIC_PATH, .Partitions={ + TPartition{.Id=4, .Offsets={ + TOffsetRange{.Begin=4, .End=7} + }} + }} + }, "unknown-consumer"); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::BAD_REQUEST); +} + Y_UNIT_TEST_F(UnknownTopic, TAddOffsetToTransactionFixture) { auto response = CallAddOffsetsToTransaction({ TTopic{.Path=INVALID_TOPIC_PATH, .Partitions={ @@ -295,6 +340,59 @@ Y_UNIT_TEST_F(AccessRights, TAddOffsetToTransactionFixture) { UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::UNAUTHORIZED); } +Y_UNIT_TEST_F(ThereAreGapsInTheOffsetRanges, TAddOffsetToTransactionFixture) { + auto response = CallAddOffsetsToTransaction({ + TTopic{.Path=VALID_TOPIC_PATH, .Partitions={ + TPartition{.Id=1, .Offsets={ + TOffsetRange{.Begin=0, .End=2}, + TOffsetRange{.Begin=4, .End=6} + }} + }} + }); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); + + auto result = tx->Commit().ExtractValueSync(); + UNIT_ASSERT_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::ABORTED); +} + +Y_UNIT_TEST_F(OnePartitionAndNoGapsInTheOffsets, TAddOffsetToTransactionFixture) { + auto response = CallAddOffsetsToTransaction({ + TTopic{.Path=VALID_TOPIC_PATH, .Partitions={ + TPartition{.Id=1, .Offsets={ + TOffsetRange{.Begin=0, .End=2} + }} + }} + }); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); + + Cerr << "<<< CommitTx <<<" << Endl; + auto result = tx->Commit().ExtractValueSync(); + Cerr << ">>> CommitTx >>>" << Endl; + UNIT_ASSERT_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::ABORTED); +} + +Y_UNIT_TEST_F(MultiplePartitionsAndNoGapsInTheOffsets, TAddOffsetToTransactionFixture) { + auto response = CallAddOffsetsToTransaction({ + TTopic{.Path=VALID_TOPIC_PATH, .Partitions={ + TPartition{.Id=1, .Offsets={ + TOffsetRange{.Begin=0, .End=2} + }}, + TPartition{.Id=2, .Offsets={ + TOffsetRange{.Begin=0, .End=4} + }} + }} + }); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); + + Cerr << "<<< CommitTx <<<" << Endl; + auto result = tx->Commit().ExtractValueSync(); + Cerr << ">>> CommitTx >>>" << Endl; + UNIT_ASSERT_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::ABORTED); +} + } } |