aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorabcdef <akotov@ydb.tech>2022-09-19 16:27:08 +0300
committerabcdef <akotov@ydb.tech>2022-09-19 16:27:08 +0300
commitb5a861d056d17322efb73efb580e0d32238e452c (patch)
treec9aea63a248b60876f0eb27471ebdd1ae7f0aab6
parent7495c426ef6955671d7ec18ac944a69ab180dab2 (diff)
downloadydb-b5a861d056d17322efb73efb580e0d32238e452c.tar.gz
-rw-r--r--ydb/core/kqp/common/kqp_gateway.h4
-rw-r--r--ydb/core/kqp/common/kqp_topic.cpp312
-rw-r--r--ydb/core/kqp/common/kqp_topic.h99
-rw-r--r--ydb/core/kqp/common/kqp_transform.h6
-rw-r--r--ydb/core/kqp/executer/kqp_data_executer.cpp174
-rw-r--r--ydb/core/kqp/executer/kqp_executer_impl.h1
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp85
-rw-r--r--ydb/core/persqueue/events/global.h13
-rw-r--r--ydb/core/persqueue/pq_impl.cpp35
-rw-r--r--ydb/core/persqueue/pq_impl.h2
-rw-r--r--ydb/core/protos/kqp.proto7
-rw-r--r--ydb/core/protos/pqconfig.proto53
-rw-r--r--ydb/public/api/protos/ydb_topic.proto2
-rw-r--r--ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.cpp10
-rw-r--r--ydb/services/persqueue_v1/ut/topic_service_ut.cpp108
15 files changed, 786 insertions, 125 deletions
diff --git a/ydb/core/kqp/common/kqp_gateway.h b/ydb/core/kqp/common/kqp_gateway.h
index f7d18b682dd..196f6098281 100644
--- a/ydb/core/kqp/common/kqp_gateway.h
+++ b/ydb/core/kqp/common/kqp_gateway.h
@@ -12,6 +12,8 @@
#include <library/cpp/actors/core/actorid.h>
#include <library/cpp/lwtrace/shuttle.h>
+#include <ydb/core/kqp/common/kqp_topic.h>
+
namespace NKikimr {
namespace NKqp {
@@ -130,6 +132,8 @@ public:
NLWTrace::TOrbit Orbit;
NWilson::TTraceId TraceId;
+
+ NTopic::TTopicOperations TopicOperations;
};
struct TExecPhysicalResult : public TGenericResult {
diff --git a/ydb/core/kqp/common/kqp_topic.cpp b/ydb/core/kqp/common/kqp_topic.cpp
index 7aaaca9e75f..e1fc6b54877 100644
--- a/ydb/core/kqp/common/kqp_topic.cpp
+++ b/ydb/core/kqp/common/kqp_topic.cpp
@@ -6,78 +6,237 @@
namespace NKikimr::NKqp::NTopic {
//
-// TPartition
+// TConsumerOperations
//
-void TPartition::AddRange(const Ydb::Topic::OffsetsRange &range)
+bool TConsumerOperations::IsValid() const
{
- AddRangeImpl(range.start(), range.end());
+ return Offsets_.GetNumIntervals() == 1;
}
-void TPartition::SetTabletId(ui64 value)
+std::pair<ui64, ui64> TConsumerOperations::GetRange() const
{
- TabletId_ = value;
+ Y_VERIFY(IsValid());
+
+ return {Offsets_.Min(), Offsets_.Max()};
}
-void TPartition::Merge(const TPartition& rhs)
+void TConsumerOperations::AddOperation(const TString& consumer, const Ydb::Topic::OffsetsRange& range)
{
+ Y_VERIFY(Consumer_.Empty() || Consumer_ == consumer);
+
+ AddOperationImpl(consumer, range.start(), range.end());
+}
+
+void TConsumerOperations::Merge(const TConsumerOperations& rhs)
+{
+ Y_VERIFY(rhs.Consumer_.Defined());
+ Y_VERIFY(Consumer_.Empty() || Consumer_ == rhs.Consumer_);
+
for (auto& range : rhs.Offsets_) {
- AddRangeImpl(range.first, range.second);
+ AddOperationImpl(*rhs.Consumer_, range.first, range.second);
}
}
-void TPartition::AddRangeImpl(ui64 begin, ui64 end)
+void TConsumerOperations::AddOperationImpl(const TString& consumer,
+ ui64 begin, ui64 end)
{
if (Offsets_.Intersects(begin, end)) {
ythrow TOffsetsRangeIntersectExpection() << "offset ranges intersect";
}
+ if (Consumer_.Empty()) {
+ Consumer_ = consumer;
+ }
+
Offsets_.InsertInterval(begin, end);
}
//
-// TTopic
+// TTopicPartitionOperations
//
-TPartition& TTopic::AddPartition(ui32 id)
+bool TTopicPartitionOperations::IsValid() const
{
- return Partitions_[id];
+ return std::all_of(Operations_.begin(), Operations_.end(),
+ [](auto& x) { return x.second.IsValid(); });
}
-TPartition* TTopic::GetPartition(ui32 id)
+void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partition,
+ const TString& consumer,
+ const Ydb::Topic::OffsetsRange& range)
{
- auto p = Partitions_.find(id);
- if (p == Partitions_.end()) {
- return nullptr;
+ Y_VERIFY(Topic_.Empty() || Topic_ == topic);
+ Y_VERIFY(Partition_.Empty() || Partition_ == partition);
+
+ if (Topic_.Empty()) {
+ Topic_ = topic;
+ Partition_ = partition;
+ }
+
+ Operations_[consumer].AddOperation(consumer, range);
+}
+
+void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partition)
+{
+ Y_VERIFY(Topic_.Empty() || Topic_ == topic);
+ Y_VERIFY(Partition_.Empty() || Partition_ == partition);
+
+ if (Topic_.Empty()) {
+ Topic_ = topic;
+ Partition_ = partition;
+ }
+
+ HasWriteOperations_ = true;
+}
+
+void TTopicPartitionOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TKqpTransaction> &txs)
+{
+ Y_VERIFY(TabletId_.Defined());
+ Y_VERIFY(Partition_.Defined());
+
+ auto& tx = txs[*TabletId_];
+
+ for (auto& [consumer, operations] : Operations_) {
+ NKikimrPQ::TPartitionOperation* o = tx.MutableOperations()->Add();
+ o->SetPartitionId(*Partition_);
+ auto [begin, end] = operations.GetRange();
+ o->SetBegin(begin);
+ o->SetEnd(end);
+ o->SetConsumer(consumer);
+ o->SetPath(*Topic_);
+ }
+
+ if (HasWriteOperations_) {
+ NKikimrPQ::TPartitionOperation* o = tx.MutableOperations()->Add();
+ o->SetPartitionId(*Partition_);
+ o->SetPath(*Topic_);
}
- return &p->second;
}
-void TTopic::Merge(const TTopic& rhs)
+void TTopicPartitionOperations::Merge(const TTopicPartitionOperations& rhs)
{
- for (auto& [name, partition] : rhs.Partitions_) {
- Partitions_[name].Merge(partition);
+ Y_VERIFY(Topic_.Empty() || Topic_ == rhs.Topic_);
+ Y_VERIFY(Partition_.Empty() || Partition_ == rhs.Partition_);
+ Y_VERIFY(TabletId_.Empty() || TabletId_ == rhs.TabletId_);
+
+ if (Topic_.Empty()) {
+ Topic_ = rhs.Topic_;
+ Partition_ = rhs.Partition_;
+ TabletId_ = rhs.TabletId_;
+ }
+
+ for (auto& [key, value] : rhs.Operations_) {
+ Operations_[key].Merge(value);
}
+
+ HasWriteOperations_ |= rhs.HasWriteOperations_;
+}
+
+ui64 TTopicPartitionOperations::GetTabletId() const
+{
+ Y_VERIFY(TabletId_.Defined());
+
+ return *TabletId_;
+}
+
+void TTopicPartitionOperations::SetTabletId(ui64 value)
+{
+ Y_VERIFY(TabletId_.Empty());
+
+ TabletId_ = value;
+}
+
+bool TTopicPartitionOperations::HasReadOperations() const
+{
+ return !Operations_.empty();
+}
+
+bool TTopicPartitionOperations::HasWriteOperations() const
+{
+ return HasWriteOperations_;
+}
+
+//
+// TTopicPartition
+//
+TTopicPartition::TTopicPartition(TString topic, ui32 partition) :
+ Topic_{std::move(topic)},
+ Partition_{partition}
+{
+}
+
+bool TTopicPartition::operator==(const TTopicPartition& x) const
+{
+ return (Topic_ == x.Topic_) && (Partition_ == x.Partition_);
+}
+
+size_t TTopicPartition::THash::operator()(const TTopicPartition& x) const
+{
+ size_t hash = std::hash<TString>{}(x.Topic_);
+ hash = CombineHashes(hash, std::hash<ui32>{}(x.Partition_));
+ return hash;
}
//
-// TOffsetsInfo
+// TTopicOperations
//
-TTopic& TOffsetsInfo::AddTopic(const TString &path)
+bool TTopicOperations::IsValid() const
{
- return Topics_[path];
+ return std::all_of(Operations_.begin(), Operations_.end(),
+ [](auto& x) { return x.second.IsValid(); });
}
-TTopic *TOffsetsInfo::GetTopic(const TString &path)
+bool TTopicOperations::HasOperations() const
{
- auto p = Topics_.find(path);
- if (p == Topics_.end()) {
- return nullptr;
+ return HasReadOperations() || HasWriteOperations();
+}
+
+bool TTopicOperations::HasReadOperations() const
+{
+ return HasReadOperations_;
+}
+
+bool TTopicOperations::HasWriteOperations() const
+{
+ return HasWriteOperations_;
+}
+
+bool TTopicOperations::TabletHasReadOperations(ui64 tabletId) const
+{
+ for (auto& [_, value] : Operations_) {
+ if (value.GetTabletId() == tabletId) {
+ return value.HasReadOperations();
+ }
}
- return &p->second;
+ return false;
+}
+
+void TTopicOperations::AddOperation(const TString& topic, ui32 partition,
+ const TString& consumer,
+ const Ydb::Topic::OffsetsRange& range)
+{
+ TTopicPartition key{topic, partition};
+ Operations_[key].AddOperation(topic, partition,
+ consumer,
+ range);
+ HasReadOperations_ = true;
+}
+
+void TTopicOperations::AddOperation(const TString& topic, ui32 partition)
+{
+ TTopicPartition key{topic, partition};
+ Operations_[key].AddOperation(topic, partition);
+ HasWriteOperations_ = true;
}
-void TOffsetsInfo::FillSchemeCacheNavigate(NSchemeCache::TSchemeCacheNavigate& navigate) const
+void TTopicOperations::FillSchemeCacheNavigate(NSchemeCache::TSchemeCacheNavigate& navigate,
+ TMaybe<TString> consumer)
{
- for (auto& [topic, _] : Topics_) {
+ TSet<TString> topics;
+ for (auto& [key, _] : Operations_) {
+ topics.insert(key.Topic_);
+ }
+
+ for (auto& topic : topics) {
NSchemeCache::TSchemeCacheNavigate::TEntry entry;
entry.Path = NKikimr::SplitPath(topic);
entry.SyncVersion = true;
@@ -86,17 +245,17 @@ void TOffsetsInfo::FillSchemeCacheNavigate(NSchemeCache::TSchemeCacheNavigate& n
navigate.ResultSet.push_back(std::move(entry));
}
+
+ Consumer_ = std::move(consumer);
}
-bool TOffsetsInfo::ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCacheNavigate::TResultSet& results,
- Ydb::StatusIds_StatusCode& status,
- TString& message)
+bool TTopicOperations::ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCacheNavigate::TResultSet& results,
+ Ydb::StatusIds_StatusCode& status,
+ TString& message)
{
- Y_VERIFY(results.size() == Topics_.size());
-
if (results.empty()) {
status = Ydb::StatusIds::BAD_REQUEST;
- message = "empty request";
+ message = "Request is empty";
return false;
}
@@ -116,24 +275,42 @@ bool TOffsetsInfo::ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCacheNa
const NKikimrSchemeOp::TPersQueueGroupDescription& description =
result.PQGroupInfo->Description;
- TTopic* topic = GetTopic(CanonizePath(result.Path));
+ if (Consumer_) {
+ bool found = false;
- //
- // TODO: если топика нет, то отправить сообщение с ошибкой SCHEME_ERROR
- //
- Y_VERIFY(topic != nullptr);
+ for (auto& consumer : description.GetPQTabletConfig().GetReadRules()) {
+ if (Consumer_ == consumer) {
+ found = true;
+ break;
+ }
+ }
+
+ if (!found) {
+ builder << "Unknown consumer '" << *Consumer_ << "'";
- for (auto& p : description.GetPartitions()) {
- if (auto partition = topic->GetPartition(p.GetPartitionId()); partition) {
- LOG_D("topic=" << description.GetName()
- << ", partition=" << p.GetPartitionId()
- << ", tabletId=" << p.GetTabletId());
+ status = Ydb::StatusIds::BAD_REQUEST;
+ message = std::move(builder);
- partition->SetTabletId(p.GetTabletId());
+ return false;
+ }
+ }
+
+ TString path = CanonizePath(result.Path);
+
+ for (auto& partition : description.GetPartitions()) {
+ TTopicPartition key{path, partition.GetPartitionId()};
+
+ if (auto p = Operations_.find(key); p != Operations_.end()) {
+ LOG_D("(topic, partition, tablet): "
+ << "'" << key.Topic_ << "'"
+ << ", " << partition.GetPartitionId()
+ << ", " << partition.GetTabletId());
+
+ p->second.SetTabletId(partition.GetTabletId());
}
}
} else {
- builder << "The '" << JoinPath(result.Path) << "' topic is missing";
+ builder << "Topic '" << JoinPath(result.Path) << "' is missing";
status = Ydb::StatusIds::SCHEME_ERROR;
message = std::move(builder);
@@ -148,11 +325,46 @@ bool TOffsetsInfo::ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCacheNa
return true;
}
-void TOffsetsInfo::Merge(const TOffsetsInfo& rhs)
+void TTopicOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TKqpTransaction> &txs)
+{
+ for (auto& [_, operations] : Operations_) {
+ operations.BuildTopicTxs(txs);
+ }
+}
+
+void TTopicOperations::Merge(const TTopicOperations& rhs)
{
- for (auto& [name, topic] : rhs.Topics_) {
- Topics_[name].Merge(topic);
+ for (auto& [key, value] : rhs.Operations_) {
+ Operations_[key].Merge(value);
}
+
+ HasReadOperations_ |= rhs.HasReadOperations_;
+ HasWriteOperations_ |= rhs.HasWriteOperations_;
+}
+
+TSet<ui64> TTopicOperations::GetReceivingTabletIds() const
+{
+ TSet<ui64> ids;
+ for (auto& [_, operations] : Operations_) {
+ if (operations.HasWriteOperations()) {
+ ids.insert(operations.GetTabletId());
+ }
+ }
+ return ids;
+}
+
+TSet<ui64> TTopicOperations::GetSendingTabletIds() const
+{
+ TSet<ui64> ids;
+ for (auto& [_, operations] : Operations_) {
+ ids.insert(operations.GetTabletId());
+ }
+ return ids;
+}
+
+size_t TTopicOperations::GetSize() const
+{
+ return Operations_.size();
}
}
diff --git a/ydb/core/kqp/common/kqp_topic.h b/ydb/core/kqp/common/kqp_topic.h
index e319c441dc7..70219893655 100644
--- a/ydb/core/kqp/common/kqp_topic.h
+++ b/ydb/core/kqp/common/kqp_topic.h
@@ -2,6 +2,7 @@
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
#include <ydb/public/api/protos/ydb_topic.pb.h>
+#include <ydb/core/protos/pqconfig.pb.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
@@ -15,47 +16,107 @@
#include <library/cpp/containers/disjoint_interval_tree/disjoint_interval_tree.h>
-#include <vector>
-
namespace NKikimr::NKqp::NTopic {
class TOffsetsRangeIntersectExpection : public yexception {
};
-struct TPartition {
- void AddRange(const Ydb::Topic::OffsetsRange &range);
- void SetTabletId(ui64 value);
+class TConsumerOperations {
+public:
+ bool IsValid() const;
+
+ std::pair<ui64, ui64> GetRange() const;
- void Merge(const TPartition& rhs);
+ ui64 GetBegin() const;
+ ui64 GetEnd() const;
+
+ void AddOperation(const TString& consumer, const Ydb::Topic::OffsetsRange& range);
+ void Merge(const TConsumerOperations& rhs);
private:
- void AddRangeImpl(ui64 begin, ui64 end);
+ void AddOperationImpl(const TString& consumer,
+ ui64 begin, ui64 end);
+ TMaybe<TString> Consumer_;
TDisjointIntervalTree<ui64> Offsets_;
- ui64 TabletId_ = Max<ui64>();
};
-struct TTopic {
- TPartition& AddPartition(ui32 id);
- TPartition* GetPartition(ui32 id);
+class TTopicPartitionOperations {
+public:
+ bool IsValid() const;
+
+ void AddOperation(const TString& topic, ui32 partition,
+ const TString& consumer,
+ const Ydb::Topic::OffsetsRange& range);
+ void AddOperation(const TString& topic, ui32 partition);
+
+ void BuildTopicTxs(THashMap<ui64, NKikimrPQ::TKqpTransaction> &txs);
- void Merge(const TTopic& rhs);
+ void Merge(const TTopicPartitionOperations& rhs);
- THashMap<ui32, TPartition> Partitions_;
+ void SetTabletId(ui64 value);
+ ui64 GetTabletId() const;
+
+ bool HasReadOperations() const;
+ bool HasWriteOperations() const;
+
+private:
+ TMaybe<TString> Topic_;
+ TMaybe<ui32> Partition_;
+ THashMap<TString, TConsumerOperations> Operations_;
+ bool HasWriteOperations_ = false;
+ TMaybe<ui64> TabletId_;
};
-struct TOffsetsInfo {
- TTopic& AddTopic(const TString &path);
- TTopic* GetTopic(const TString &path);
+struct TTopicPartition {
+ struct THash {
+ size_t operator()(const TTopicPartition& x) const;
+ };
+
+ TTopicPartition(TString topic, ui32 partition);
+
+ bool operator==(const TTopicPartition& x) const;
- void FillSchemeCacheNavigate(NSchemeCache::TSchemeCacheNavigate& navigate) const;
+ TString Topic_;
+ ui32 Partition_;
+};
+
+class TTopicOperations {
+public:
+ bool IsValid() const;
+
+ bool HasOperations() const;
+ bool HasReadOperations() const;
+ bool HasWriteOperations() const;
+
+ bool TabletHasReadOperations(ui64 tabletId) const;
+
+ void AddOperation(const TString& topic, ui32 partition,
+ const TString& consumer,
+ const Ydb::Topic::OffsetsRange& range);
+ void AddOperation(const TString& topic, ui32 partition);
+
+ void FillSchemeCacheNavigate(NSchemeCache::TSchemeCacheNavigate& navigate,
+ TMaybe<TString> consumer);
bool ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCacheNavigate::TResultSet& results,
Ydb::StatusIds_StatusCode& status,
TString& message);
- void Merge(const TOffsetsInfo& rhs);
+ void BuildTopicTxs(THashMap<ui64, NKikimrPQ::TKqpTransaction> &txs);
+
+ void Merge(const TTopicOperations& rhs);
+
+ TSet<ui64> GetReceivingTabletIds() const;
+ TSet<ui64> GetSendingTabletIds() const;
+
+ size_t GetSize() const;
+
+private:
+ THashMap<TTopicPartition, TTopicPartitionOperations, TTopicPartition::THash> Operations_;
+ bool HasReadOperations_ = false;
+ bool HasWriteOperations_ = false;
- THashMap<TString, TTopic> Topics_;
+ TMaybe<TString> Consumer_;
};
}
diff --git a/ydb/core/kqp/common/kqp_transform.h b/ydb/core/kqp/common/kqp_transform.h
index 93e85ab7b33..a9b1face3b0 100644
--- a/ydb/core/kqp/common/kqp_transform.h
+++ b/ydb/core/kqp/common/kqp_transform.h
@@ -241,10 +241,6 @@ public:
ForceNewEngineSettings.ForceNewEngineLevel = level;
}
- void MergeTopicOffsets(const NTopic::TOffsetsInfo &offsets) {
- TopicOffsets.Merge(offsets);
- }
-
public:
struct TParamsState : public TThrRefBase {
TParamValueMap Values;
@@ -265,7 +261,7 @@ public:
TKqpTxLocks Locks;
TDeferredEffects DeferredEffects;
- NTopic::TOffsetsInfo TopicOffsets;
+ NTopic::TTopicOperations TopicOperations;
TIntrusivePtr<TParamsState> ParamsState;
IKqpGateway::TKqpSnapshotHandle SnapshotHandle;
diff --git a/ydb/core/kqp/executer/kqp_data_executer.cpp b/ydb/core/kqp/executer/kqp_data_executer.cpp
index 07135d24a18..d6255e721a1 100644
--- a/ydb/core/kqp/executer/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_data_executer.cpp
@@ -22,6 +22,7 @@
#include <ydb/core/tx/long_tx_service/public/events.h>
#include <ydb/core/tx/long_tx_service/public/lock_handle.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
+#include <ydb/core/persqueue/events/global.h>
#include <ydb/library/yql/dq/runtime/dq_columns_resolve.h>
#include <ydb/library/yql/dq/tasks/dq_connection_builder.h>
@@ -196,6 +197,7 @@ private:
hFunc(TEvDataShard::TEvProposeTransactionResult, HandlePrepare);
hFunc(TEvDataShard::TEvProposeTransactionRestart, HandleExecute);
hFunc(TEvDataShard::TEvProposeTransactionAttachResult, HandlePrepare);
+ hFunc(TEvPersQueue::TEvProposeTransactionResult, HandlePrepare);
hFunc(TEvPrivate::TEvReattachToShard, HandleExecute);
hFunc(TEvDqCompute::TEvState, HandlePrepare); // from CA
hFunc(TEvDqCompute::TEvChannelData, HandleExecute); // from CA
@@ -214,6 +216,21 @@ private:
ReportEventElapsedTime();
}
+ void HandlePrepare(TEvPersQueue::TEvProposeTransactionResult::TPtr& ev) {
+ NKikimrPQ::TEvProposeTransactionResult& event = ev->Get()->Record;
+
+ LOG_D("Got propose result, topic tablet: " << event.GetOrigin() << ", status: "
+ << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(event.GetStatus()));
+
+ TShardState *state = TopicTabletStates.FindPtr(event.GetOrigin());
+ YQL_ENSURE(state);
+
+ YQL_ENSURE(event.GetStatus() == NKikimrPQ::TEvProposeTransactionResult::ERROR);
+
+ auto issue = YqlIssue({}, TIssuesIds::KIKIMR_OPERATION_ABORTED);
+ ReplyErrorAndDie(Ydb::StatusIds::ABORTED, issue);
+ }
+
void HandlePrepare(TEvDataShard::TEvProposeTransactionResult::TPtr& ev) {
TEvDataShard::TEvProposeTransactionResult* res = ev->Get();
const ui64 shardId = res->GetOrigin();
@@ -529,6 +546,10 @@ private:
auto& affectedSet = *transaction.MutableAffectedSet();
affectedSet.Reserve(static_cast<int>(ShardStates.size()));
+ //
+ // TODO(abcdef): учесть таблетки топиков
+ //
+
ui64 aggrMinStep = 0;
ui64 aggrMaxStep = Max<ui64>();
ui64 totalReadSize = 0;
@@ -564,6 +585,10 @@ private:
}
}
+ //
+ // TODO(abcdef): учесть таблетки топиков
+ //
+
item.SetFlags(affectedFlags);
}
@@ -597,6 +622,7 @@ private:
hFunc(TEvDataShard::TEvProposeTransactionResult, HandleExecute);
hFunc(TEvDataShard::TEvProposeTransactionRestart, HandleExecute);
hFunc(TEvDataShard::TEvProposeTransactionAttachResult, HandleExecute);
+ hFunc(TEvPersQueue::TEvProposeTransactionResult, HandleExecute);
hFunc(TEvPrivate::TEvReattachToShard, HandleExecute);
hFunc(TEvPipeCache::TEvDeliveryProblem, HandleExecute);
hFunc(TEvTxProxy::TEvProposeTransactionStatus, HandleExecute);
@@ -613,6 +639,21 @@ private:
ReportEventElapsedTime();
}
+ void HandleExecute(TEvPersQueue::TEvProposeTransactionResult::TPtr& ev) {
+ NKikimrPQ::TEvProposeTransactionResult& event = ev->Get()->Record;
+
+ LOG_D("Got propose result, topic tablet: " << event.GetOrigin() << ", status: "
+ << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(event.GetStatus()));
+
+ TShardState *state = TopicTabletStates.FindPtr(event.GetOrigin());
+ YQL_ENSURE(state);
+
+ YQL_ENSURE(event.GetStatus() == NKikimrPQ::TEvProposeTransactionResult::ERROR);
+
+ auto issue = YqlIssue({}, TIssuesIds::KIKIMR_OPERATION_ABORTED);
+ ReplyErrorAndDie(Ydb::StatusIds::ABORTED, issue);
+ }
+
void HandleExecute(TEvDataShard::TEvProposeTransactionResult::TPtr& ev) {
TEvDataShard::TEvProposeTransactionResult* res = ev->Get();
const ui64 shardId = res->GetOrigin();
@@ -932,6 +973,12 @@ private:
LOG_D("Datashard " << x.first << " not finished yet: " << ToString(x.second.State));
}
}
+ for (auto& x : TopicTabletStates) {
+ if (x.second.State != TShardState::EState::Finished) {
+ ++notFinished;
+ LOG_D("TopicTablet " << x.first << " not finished yet: " << ToString(x.second.State));
+ }
+ }
if (notFinished == 0 && PendingComputeActors.empty()) {
Finalize();
return;
@@ -948,6 +995,11 @@ private:
sb << "DS " << shardId << " (" << ToString(shardState.State) << "), ";
}
}
+ for (auto& [tabletId, tabletState] : TopicTabletStates) {
+ if (tabletState.State != TShardState::EState::Finished) {
+ sb << "PQ " << tabletId << " (" << ToString(tabletState.State) << "), ";
+ }
+ }
LOG_D(sb);
}
}
@@ -1308,7 +1360,7 @@ private:
LWTRACK(KqpDataExecuterStartExecute, ResponseEv->Orbit, TxId);
RequestControls.Reqister(TlsActivationContext->AsActorContext());
- ReadOnlyTx = true;
+ ReadOnlyTx = !Request.TopicOperations.HasOperations();
auto& funcRegistry = *AppData()->FunctionRegistry;
NMiniKQL::TScopedAlloc alloc(TAlignedPagePoolCounters(), funcRegistry.SupportsSizedAllocators());
@@ -1507,10 +1559,12 @@ private:
return;
}
- auto datashardTxs = BuildDatashardTxs(datashardTasks);
+ TTopicTabletTxs topicTxs;
+ auto datashardTxs = BuildDatashardTxs(datashardTasks, topicTxs);
// Single-shard transactions are always immediate
- ImmediateTx = datashardTxs.size() <= 1;
+ ImmediateTx = (datashardTxs.size() + Request.TopicOperations.GetSize()) <= 1;
+
switch (Request.IsolationLevel) {
// OnlineRO with AllowInconsistentReads = true
case NKikimrKqp::ISOLATION_LEVEL_READ_UNCOMMITTED:
@@ -1541,6 +1595,7 @@ private:
if (forceSnapshot) {
ComputeTasks = std::move(computeTasks);
DatashardTxs = std::move(datashardTxs);
+ TopicTxs = std::move(topicTxs);
auto longTxService = NLongTxService::MakeLongTxServiceID(SelfId().NodeId());
Send(longTxService, new NLongTxService::TEvLongTxService::TEvAcquireReadSnapshot(Database));
@@ -1558,7 +1613,8 @@ private:
if (prepareTasksSpan) {
prepareTasksSpan.End();
}
- ContinueExecute(computeTasks, datashardTxs);
+
+ ContinueExecute(computeTasks, datashardTxs, topicTxs);
}
STATEFN(WaitSnapshotState) {
@@ -1589,15 +1645,21 @@ private:
auto computeTasks = std::move(ComputeTasks);
auto datashardTxs = std::move(DatashardTxs);
- ContinueExecute(computeTasks, datashardTxs);
+ auto topicTxs = std::move(TopicTxs);
+
+ ContinueExecute(computeTasks, datashardTxs, topicTxs);
}
- void ContinueExecute(
- TVector<NDqProto::TDqTask>& computeTasks,
- THashMap<ui64, NKikimrTxDataShard::TKqpTransaction>& datashardTxs)
+ using TDatashardTxs = THashMap<ui64, NKikimrTxDataShard::TKqpTransaction>;
+ using TTopicTabletTxs = THashMap<ui64, NKikimrPQ::TKqpTransaction>;
+
+ void ContinueExecute(TVector<NDqProto::TDqTask>& computeTasks,
+ TDatashardTxs& datashardTxs,
+ TTopicTabletTxs& topicTxs)
{
UseFollowers = Request.IsolationLevel == NKikimrKqp::ISOLATION_LEVEL_READ_STALE;
- if (datashardTxs.size() > 1) {
+
+ if (!ImmediateTx) {
// Followers only allowed for single shard transactions.
// (legacy behaviour, for compatibility with current execution engine)
UseFollowers = false;
@@ -1616,7 +1678,7 @@ private:
//Stats->ComputeStats.reserve(computeTasks.size());
}
- Execute(computeTasks, datashardTxs);
+ Execute(computeTasks, datashardTxs, topicTxs);
if (ImmediateTx) {
LOG_T("Immediate tx, become ExecuteState");
@@ -1635,8 +1697,9 @@ private:
}
}
- THashMap<ui64, NKikimrTxDataShard::TKqpTransaction> BuildDatashardTxs(const THashMap<ui64, TVector<NDqProto::TDqTask>>& datashardTasks) {
- THashMap<ui64, NKikimrTxDataShard::TKqpTransaction> datashardTxs;
+ TDatashardTxs BuildDatashardTxs(const THashMap<ui64, TVector<NDqProto::TDqTask>>& datashardTasks,
+ TTopicTabletTxs& topicTxs) {
+ TDatashardTxs datashardTxs;
for (auto& [shardId, tasks]: datashardTasks) {
auto& dsTxs = datashardTxs[shardId];
@@ -1645,8 +1708,11 @@ private:
}
}
- if (auto locksMap = ExtractLocks(Request.Locks); !locksMap.empty()) {
+ Request.TopicOperations.BuildTopicTxs(topicTxs);
+
+ if (auto locksMap = ExtractLocks(Request.Locks); !locksMap.empty() || Request.TopicOperations.HasReadOperations()) {
YQL_ENSURE(Request.ValidateLocks || Request.EraseLocks);
+
auto locksOp = Request.ValidateLocks && Request.EraseLocks
? NKikimrTxDataShard::TKqpLocks::Commit
: (Request.ValidateLocks
@@ -1660,6 +1726,8 @@ private:
taskShardIds.insert(shardId);
}
}
+
+ taskShardIds.merge(Request.TopicOperations.GetReceivingTabletIds());
}
TSet<ui64> locksSendingShards;
@@ -1671,26 +1739,48 @@ private:
tx.MutableLocks()->MutableLocks()->Add()->Swap(&lock);
}
- if (!locksList.empty() && Request.ValidateLocks) {
+ if ((!locksList.empty() || Request.TopicOperations.HasReadOperations()) && Request.ValidateLocks) {
locksSendingShards.insert(shardId);
}
}
+ locksSendingShards.merge(Request.TopicOperations.GetSendingTabletIds());
+
if (Request.ValidateLocks) {
NProtoBuf::RepeatedField<ui64> sendingShards(locksSendingShards.begin(), locksSendingShards.end());
NProtoBuf::RepeatedField<ui64> receivingShards(taskShardIds.begin(), taskShardIds.end());
+
for (auto& [shardId, shardTx] : datashardTxs) {
shardTx.MutableLocks()->SetOp(locksOp);
shardTx.MutableLocks()->MutableSendingShards()->CopyFrom(sendingShards);
shardTx.MutableLocks()->MutableReceivingShards()->CopyFrom(receivingShards);
}
+
+ for (auto& [_, tx] : topicTxs) {
+ switch (locksOp) {
+ case NKikimrTxDataShard::TKqpLocks::Commit:
+ tx.SetOp(NKikimrPQ::TKqpTransaction::Commit);
+ break;
+ case NKikimrTxDataShard::TKqpLocks::Validate:
+ tx.SetOp(NKikimrPQ::TKqpTransaction::Validate);
+ break;
+ case NKikimrTxDataShard::TKqpLocks::Rollback:
+ tx.SetOp(NKikimrPQ::TKqpTransaction::Rollback);
+ break;
+ default:
+ break;
+ }
+
+ tx.MutableSendingShards()->CopyFrom(sendingShards);
+ tx.MutableReceivingShards()->CopyFrom(receivingShards);
+ }
}
}
return datashardTxs;
}
- void Execute(TVector<NDqProto::TDqTask>& computeTasks, THashMap<ui64, NKikimrTxDataShard::TKqpTransaction>& datashardTxs) {
+ void Execute(TVector<NDqProto::TDqTask>& computeTasks, TDatashardTxs& datashardTxs, TTopicTabletTxs& topicTxs) {
auto lockTxId = Request.AcquireLocksTxId;
if (lockTxId.Defined() && *lockTxId == 0) {
lockTxId = TxId;
@@ -1756,6 +1846,8 @@ private:
ExecuteDatashardTransaction(shardId, shardTx, lockTxId);
}
+ ExecuteTopicTabletTransactions(topicTxs);
+
if (sendTasksSpan) {
sendTasksSpan.End();
}
@@ -1763,6 +1855,7 @@ private:
LOG_I("Total tasks: " << TasksGraph.GetTasks().size()
<< ", readonly: " << ReadOnlyTx
<< ", datashardTxs: " << datashardTxs.size()
+ << ", topicTxs: " << Request.TopicOperations.GetSize()
<< ", immediate: " << ImmediateTx
<< ", useFollowers: " << UseFollowers);
@@ -1773,9 +1866,56 @@ private:
CollectTaskChannelsUpdates(task, updates);
}
PropagateChannelsUpdates(updates);
+
CheckExecutionComplete();
}
+ void ExecuteTopicTabletTransactions(TTopicTabletTxs& topicTxs) {
+ auto lockTxId = Request.AcquireLocksTxId;
+ if (lockTxId.Defined() && *lockTxId == 0) {
+ lockTxId = TxId;
+ }
+
+ for (auto& tx : topicTxs) {
+ auto tabletId = tx.first;
+ auto& transaction = tx.second;
+
+ auto ev = std::make_unique<TEvPersQueue::TEvProposeTransaction>();
+
+ if (lockTxId) {
+ transaction.SetLockTxId(*lockTxId);
+ }
+ transaction.SetImmediate(ImmediateTx);
+
+ ActorIdToProto(SelfId(), ev->Record.MutableSource());
+ ev->Record.MutableTxBody()->Swap(&transaction);
+ ev->Record.SetTxId(TxId);
+
+ auto traceId = ExecuterSpan.GetTraceId();
+ LOG_D("ExecuteTopicTabletTransaction traceId.verbosity: " << std::to_string(traceId.GetVerbosity()));
+
+ LOG_D("Executing KQP transaction on topic tablet: " << tabletId
+ << ", lockTxId: " << lockTxId);
+
+ Send(MakePipePeNodeCacheID(UseFollowers),
+ new TEvPipeCache::TEvForward(ev.release(), tabletId, true),
+ 0,
+ 0,
+ std::move(traceId));
+
+ TShardState state;
+ state.State =
+ ImmediateTx ? TShardState::EState::Executing : TShardState::EState::Preparing;
+ state.DatashardState.ConstructInPlace();
+ state.DatashardState->Follower = UseFollowers;
+
+ state.DatashardState->ShardReadLocks = Request.TopicOperations.TabletHasReadOperations(tabletId);
+
+ auto result = TopicTabletStates.emplace(tabletId, std::move(state));
+ YQL_ENSURE(result.second);
+ }
+ }
+
void Finalize() {
auto& response = *ResponseEv->Record.MutableResponse();
@@ -1929,6 +2069,7 @@ private:
NTxProxy::TRequestControls RequestControls;
ui64 TxCoordinator = 0;
THashMap<ui64, TShardState> ShardStates;
+ THashMap<ui64, TShardState> TopicTabletStates;
TVector<NKikimrTxDataShard::TLock> Locks;
TVector<TKqpExecuterTxResult> Results;
bool ReadOnlyTx = true;
@@ -1948,7 +2089,8 @@ private:
// Temporary storage during snapshot acquisition
TVector<NDqProto::TDqTask> ComputeTasks;
- THashMap<ui64, NKikimrTxDataShard::TKqpTransaction> DatashardTxs;
+ TDatashardTxs DatashardTxs;
+ TTopicTabletTxs TopicTxs;
// Lock handle for a newly acquired lock
TLockHandle LockHandle;
diff --git a/ydb/core/kqp/executer/kqp_executer_impl.h b/ydb/core/kqp/executer/kqp_executer_impl.h
index 250299766d2..583bcd6011c 100644
--- a/ydb/core/kqp/executer/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer/kqp_executer_impl.h
@@ -13,6 +13,7 @@
#include <ydb/core/base/wilson.h>
#include <ydb/core/base/kikimr_issue.h>
#include <ydb/core/protos/tx_datashard.pb.h>
+#include <ydb/core/protos/pqconfig.pb.h>
#include <ydb/core/kqp/executer/kqp_tasks_graph.h>
#include <ydb/core/kqp/kqp.h>
#include <ydb/core/grpc_services/local_rate_limiter.h>
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index 37916452c8b..28ea418d34e 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -98,7 +98,7 @@ struct TKqpQueryState {
TString TxId_Human = "";
bool Commit = false;
- NTopic::TOffsetsInfo Offsets;
+ NTopic::TTopicOperations TopicOperations;
};
struct TKqpCleanupCtx {
@@ -424,24 +424,40 @@ public:
return;
}
- QueryState->Offsets = NTopic::TOffsetsInfo();
+ YQL_ENSURE(QueryState->Request.HasTopicOperations());
- for (auto& topic : QueryState->Request.topics()) {
- auto path = CanonizePath(NPersQueue::GetFullTopicPath(ctx, QueryState->Request.GetDatabase(), topic.path()));
- auto& t = QueryState->Offsets.AddTopic(path);
+ const NKikimrKqp::TTopicOperations& operations = QueryState->Request.GetTopicOperations();
- for (auto& partition : topic.partitions()) {
- auto& p = t.AddPartition(partition.partition_id());
+ TMaybe<TString> consumer;
+ if (operations.HasConsumer()) {
+ consumer = operations.GetConsumer();
+ }
+
+ QueryState->TopicOperations = NTopic::TTopicOperations();
+
+ for (auto& topic : operations.GetTopics()) {
+ auto path =
+ CanonizePath(NPersQueue::GetFullTopicPath(ctx, QueryState->Request.GetDatabase(), topic.path()));
- for (auto& range : partition.partition_offsets()) {
- p.AddRange(range);
+ for (auto& partition : topic.partitions()) {
+ if (partition.partition_offsets().empty()) {
+ QueryState->TopicOperations.AddOperation(path, partition.partition_id());
+ } else {
+ for (auto& range : partition.partition_offsets()) {
+ YQL_ENSURE(consumer.Defined());
+
+ QueryState->TopicOperations.AddOperation(path, partition.partition_id(),
+ *consumer,
+ range);
+ }
}
}
}
auto navigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
navigate->DatabaseName = CanonizePath(QueryState->Request.GetDatabase());
- QueryState->Offsets.FillSchemeCacheNavigate(*navigate);
+ QueryState->TopicOperations.FillSchemeCacheNavigate(*navigate,
+ std::move(consumer));
navigate->UserToken = new NACLib::TUserToken(QueryState->UserToken);
Become(&TKqpSessionActor::TopicOpsState);
@@ -961,6 +977,23 @@ public:
return true;
}
+ bool CheckTopicOperations() {
+ auto& txCtx = *QueryState->TxCtx;
+
+ if (txCtx.TopicOperations.IsValid()) {
+ return true;
+ }
+
+ auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
+ std::vector<TIssue> issues {
+ YqlIssue({}, TIssuesIds::KIKIMR_BAD_REQUEST, "Incorrect offset ranges in the transaction.")
+ };
+ ReplyQueryError(requestInfo, Ydb::StatusIds::ABORTED, "incorrect offset ranges in the tx",
+ MessageFromIssues(issues));
+
+ return false;
+ }
+
void ExecuteOrDefer() {
if (!(QueryState->PreparedQuery
&& QueryState->CurrentTx < QueryState->PreparedQuery->GetPhysicalQuery().TransactionsSize()))
@@ -991,7 +1024,7 @@ public:
break;
}
}
- if (!CheckTransacionLocks()) {
+ if (!CheckTransacionLocks() || !CheckTopicOperations()) {
return;
}
@@ -1014,6 +1047,10 @@ public:
LOG_D("ExecutePhyTx, tx: " << (void*)tx.get() << " commit: " << commit
<< " txCtx.DeferredEffects.size(): " << txCtx.DeferredEffects.Size());
+ if (!CheckTopicOperations()) {
+ return true;
+ }
+
// TODO Handle timeouts -- request.Timeout, request.CancelAfter
if (tx) {
@@ -1029,7 +1066,8 @@ public:
request.Transactions.emplace_back(tx, PrepareParameters(*tx));
} else {
YQL_ENSURE(commit);
- if (txCtx.DeferredEffects.Empty() && !txCtx.Locks.HasLocks()) {
+
+ if (txCtx.DeferredEffects.Empty() && !txCtx.Locks.HasLocks() && !txCtx.TopicOperations.HasOperations()) {
ReplySuccess();
return true;
}
@@ -1039,6 +1077,7 @@ public:
for (const auto& effect : txCtx.DeferredEffects) {
YQL_ENSURE(!effect.Node);
YQL_ENSURE(effect.PhysicalTx->GetType() == NKqpProto::TKqpPhyTx::TYPE_DATA);
+
request.Transactions.emplace_back(effect.PhysicalTx, GetParamsRefMap(effect.Params));
LOG_D("TExecPhysicalRequest, add DeferredEffect to Transaction,"
@@ -1049,9 +1088,11 @@ public:
request.PerShardKeysSizeLimitBytes = Config->_CommitPerShardKeysSizeLimitBytes.Get().GetRef();
}
- if (txCtx.Locks.HasLocks()) {
- request.ValidateLocks = !(txCtx.GetSnapshot().IsValid() && txCtx.DeferredEffects.Empty());
+ if (txCtx.Locks.HasLocks() || txCtx.TopicOperations.HasReadOperations()) {
+ request.ValidateLocks = !(txCtx.GetSnapshot().IsValid() && txCtx.DeferredEffects.Empty()) ||
+ txCtx.TopicOperations.HasReadOperations();
request.EraseLocks = true;
+
LOG_D("TExecPhysicalRequest, tx has locks, ValidateLocks: " << request.ValidateLocks
<< " EraseLocks: " << request.EraseLocks);
@@ -1059,6 +1100,8 @@ public:
request.Locks.emplace_back(lock.GetValueRef(txCtx.Locks.LockType));
}
}
+
+ request.TopicOperations = std::move(txCtx.TopicOperations);
} else if (ShouldAcquireLocks(query)) {
request.AcquireLocksTxId = txCtx.Locks.GetLockTxId();
}
@@ -1069,7 +1112,9 @@ public:
request.Transactions.size(),
request.Locks.size(),
request.AcquireLocksTxId.Defined());
+
SendToExecuter(std::move(request));
+
return false;
}
@@ -2081,14 +2126,14 @@ private:
ythrow TRequestFail(requestInfo, Ydb::StatusIds::SCHEME_ERROR) << message;
}
- QueryState->Offsets.ProcessSchemeCacheNavigate(response->ResultSet,
- status,
- message);
+ QueryState->TopicOperations.ProcessSchemeCacheNavigate(response->ResultSet,
+ status,
+ message);
if (status != Ydb::StatusIds::SUCCESS) {
ythrow TRequestFail(requestInfo, status) << message;
}
- if (!TryMergeTopicOffsets(QueryState->Offsets, message)) {
+ if (!TryMergeTopicOffsets(QueryState->TopicOperations, message)) {
ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST) << message;
}
@@ -2145,10 +2190,10 @@ private:
return true;
}
- bool TryMergeTopicOffsets(const NTopic::TOffsetsInfo &offsets, TString& message) {
+ bool TryMergeTopicOffsets(const NTopic::TTopicOperations &operations, TString& message) {
try {
YQL_ENSURE(QueryState);
- QueryState->TxCtx->MergeTopicOffsets(offsets);
+ QueryState->TxCtx->TopicOperations.Merge(operations);
return true;
} catch (const NTopic::TOffsetsRangeIntersectExpection &ex) {
message = ex.what();
diff --git a/ydb/core/persqueue/events/global.h b/ydb/core/persqueue/events/global.h
index e1957e7a16f..5ea9600c343 100644
--- a/ydb/core/persqueue/events/global.h
+++ b/ydb/core/persqueue/events/global.h
@@ -8,7 +8,6 @@
#include <ydb/core/protos/msgbus.pb.h>
#include <ydb/public/api/protos/draft/persqueue_common.pb.h>
-
namespace NKikimr {
struct TEvPersQueue {
@@ -43,6 +42,9 @@ struct TEvPersQueue {
EvGetPartitionIdForWrite,
EvGetPartitionIdForWriteResponse,
EvReportPartitionError,
+ EvProposeTransaction,
+ EvProposeTransactionResult,
+ EvCancelTransactionProposal,
EvResponse = EvRequest + 256,
EvInternalEvents = EvResponse + 256,
EvEnd
@@ -218,5 +220,14 @@ struct TEvPersQueue {
{}
};
+ struct TEvProposeTransaction : public TEventPB<TEvProposeTransaction, NKikimrPQ::TEvProposeTransaction, EvProposeTransaction> {
+ };
+
+ struct TEvProposeTransactionResult : public TEventPB<TEvProposeTransactionResult, NKikimrPQ::TEvProposeTransactionResult, EvProposeTransactionResult> {
+ };
+
+ struct TEvCancelTransactionProposal : public TEventPB<TEvCancelTransactionProposal, NKikimrPQ::TEvCancelTransactionProposal, EvCancelTransactionProposal> {
+ };
+
};
} //NKikimr
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index d242264ffc2..9160db6f6c7 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -2154,6 +2154,40 @@ void TPersQueue::HandleWakeup(const TActorContext& ctx) {
ctx.Schedule(TDuration::Seconds(5), new TEvents::TEvWakeup());
}
+void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx)
+{
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Handle TEvPersQueue::TEvProposeTransaction. topicPath=" << TopicPath);
+
+ NKikimrPQ::TEvProposeTransaction& event = ev->Get()->Record;
+ const NKikimrPQ::TKqpTransaction& txBody = event.GetTxBody();
+
+ for (auto& operation : txBody.GetOperations()) {
+ Y_VERIFY(!operation.HasPath() || (operation.GetPath() == TopicPath));
+
+ bool isWriteOperation = !operation.HasBegin();
+
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE,
+ "tx=" << event.GetTxId() <<
+ ", lock_tx_id=" << txBody.GetLockTxId() <<
+ ", path=" << operation.GetPath() <<
+ ", partition=" << operation.GetPartitionId() <<
+ ", consumer=" << operation.GetConsumer() <<
+ ", begin=" << operation.GetBegin() <<
+ ", end=" << operation.GetEnd() <<
+ ", is_write=" << isWriteOperation);
+ }
+
+ auto result = std::make_unique<TEvPersQueue::TEvProposeTransactionResult>();
+
+ result->Record.SetOrigin(TabletID());
+ result->Record.SetStatus(NKikimrPQ::TEvProposeTransactionResult::ERROR);
+ result->Record.SetTxId(event.GetTxId());
+ //result->Record.SetMinStep();
+ //result->Record.SetMaxStep();
+ //result->Record.SetStep();
+
+ ctx.Send(ActorIdFromProto(event.GetSource()), result.release());
+}
bool TPersQueue::HandleHook(STFUNC_SIG)
{
@@ -2181,6 +2215,7 @@ bool TPersQueue::HandleHook(STFUNC_SIG)
HFuncTraced(TEvPQ::TEvError, Handle);
HFuncTraced(TEvPQ::TEvProxyResponse, Handle);
CFunc(TEvents::TSystem::Wakeup, HandleWakeup);
+ HFuncTraced(TEvPersQueue::TEvProposeTransaction, Handle);
default:
return false;
}
diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h
index 07ba60620c6..73fa4f3cd0b 100644
--- a/ydb/core/persqueue/pq_impl.h
+++ b/ydb/core/persqueue/pq_impl.h
@@ -34,6 +34,8 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
void HandleWakeup(const TActorContext&);
+ void Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext&);
+
void InitResponseBuilder(const ui64 responseCookie, const ui32 count, const ui32 counterId);
void Handle(TEvPQ::TEvError::TPtr& ev, const TActorContext&);
void Handle(TEvPQ::TEvProxyResponse::TPtr& ev, const TActorContext&);
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index bb647ed258c..836057d074d 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -73,6 +73,11 @@ message TKqlSettings {
optional bool NewEngine = 6;
};
+message TTopicOperations {
+ optional string Consumer = 1;
+ repeated Ydb.Topic.AddOffsetsToTransactionRequest.TopicOffsets Topics = 2;
+}
+
message TQueryRequest {
optional bytes SessionId = 1;
optional string Query = 2;
@@ -95,7 +100,7 @@ message TQueryRequest {
reserved 19; // (deprecated) StatsMode
optional NYql.NDqProto.EDqStatsMode StatsMode = 20; // deprecated
optional Ydb.Table.QueryStatsCollection.Mode CollectStats = 21;
- repeated Ydb.Topic.AddOffsetsToTransactionRequest.TopicOffsets Topics = 22;
+ optional TTopicOperations TopicOperations = 22;
}
message TKqpPathIdProto {
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index 466b72ef1b9..f4fb341596c 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -739,3 +739,56 @@ message TYdsShardIterator {
required uint64 CreationTimestampMs = 6;
optional ETopicKind Kind = 7;
}
+
+message TPartitionOperation {
+ required uint64 PartitionId = 1;
+ optional uint64 Begin = 2;
+ optional uint64 End = 3;
+ optional string Consumer = 4;
+ required string Path = 5; // topic path
+};
+
+message TKqpTransaction {
+ enum ELocksOp {
+ Unspecified = 0;
+ Validate = 1;
+ Commit = 2;
+ Rollback = 3;
+ }
+
+ repeated TPartitionOperation Operations = 1;
+ required ELocksOp Op = 2;
+ repeated uint64 SendingShards = 3;
+ repeated uint64 ReceivingShards = 4;
+ required bool Immediate = 5;
+ optional fixed64 LockTxId = 6;
+}
+
+message TEvProposeTransaction {
+ required NActorsProto.TActorId Source = 1;
+ required TKqpTransaction TxBody = 2;
+ required uint64 TxId = 3;
+};
+
+message TEvProposeTransactionResult {
+ enum EStatus {
+ PREPARED = 1;
+ COMPLETE = 2;
+ ABORTED = 3;
+ ERROR = 4;
+ LOCKS_BROKEN = 5;
+ CANCELLED = 6;
+ BAD_REQUEST = 7;
+ };
+
+ required uint64 Origin = 1; // Tablet Id
+ required EStatus Status = 2;
+ required uint64 TxId = 3;
+ optional uint64 MinStep = 4;
+ optional uint64 MaxStep = 6;
+ optional uint64 Step = 7;
+};
+
+message TEvCancelTransactionProposal {
+ required uint64 TxId = 1;
+};
diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto
index 4501e3fee78..f310bf16eef 100644
--- a/ydb/public/api/protos/ydb_topic.proto
+++ b/ydb/public/api/protos/ydb_topic.proto
@@ -509,6 +509,8 @@ message AddOffsetsToTransactionRequest {
// Ranges of offsets by topics.
repeated TopicOffsets topics = 4;
+ string consumer = 5;
+
message TopicOffsets {
// Topic path.
string path = 1;
diff --git a/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.cpp b/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.cpp
index b6d1e4933b0..5c962458e80 100644
--- a/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.cpp
+++ b/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.cpp
@@ -29,9 +29,6 @@ void TAddOffsetsToTransactionActor::Proceed(const NActors::TActorContext& ctx)
return Reply(Ydb::StatusIds::BAD_REQUEST, issues, ctx);
}
- //
- // TODO: новый тип запроса. например, QUERY_TYPE_TOPIC
- //
ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_UNDEFINED);
ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_TOPIC);
@@ -57,11 +54,8 @@ void TAddOffsetsToTransactionActor::Proceed(const NActors::TActorContext& ctx)
//
ev->Record.MutableRequest()->MutableTxControl()->CopyFrom(req->tx_control());
-
- //
- // скопировать информацию о смещениях
- //
- *ev->Record.MutableRequest()->MutableTopics() = req->Gettopics();
+ ev->Record.MutableRequest()->MutableTopicOperations()->SetConsumer(req->consumer());
+ *ev->Record.MutableRequest()->MutableTopicOperations()->MutableTopics() = req->topics();
ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
}
diff --git a/ydb/services/persqueue_v1/ut/topic_service_ut.cpp b/ydb/services/persqueue_v1/ut/topic_service_ut.cpp
index 089a960073f..1390f3b7071 100644
--- a/ydb/services/persqueue_v1/ut/topic_service_ut.cpp
+++ b/ydb/services/persqueue_v1/ut/topic_service_ut.cpp
@@ -97,6 +97,7 @@ void AppendTopic(const TTopic &t,
Ydb::Topic::AddOffsetsToTransactionRequest CreateRequest(const TString& session_id,
const TString& tx_id,
+ const TString& consumer,
const TVector<TTopic>& topics)
{
Ydb::Topic::AddOffsetsToTransactionRequest request;
@@ -104,6 +105,8 @@ Ydb::Topic::AddOffsetsToTransactionRequest CreateRequest(const TString& session_
request.set_session_id(session_id);
request.mutable_tx_control()->set_tx_id(tx_id);
+ request.set_consumer(consumer);
+
for (auto& t : topics) {
AppendTopic(t, request.mutable_topics());
}
@@ -140,10 +143,15 @@ protected:
, NKikimrServices::TX_PROXY_SCHEME_CACHE
, NKikimrServices::KQP_PROXY
, NKikimrServices::PERSQUEUE
+ , NKikimrServices::KQP_EXECUTER
, NKikimrServices::KQP_SESSION}, NActors::NLog::PRI_DEBUG);
auto partsCount = 5u;
- server->AnnoyingClient->CreateTopicNoLegacy(VALID_TOPIC_PATH, partsCount);
+ server->AnnoyingClient->CreateTopicNoLegacy(VALID_TOPIC_PATH, partsCount,
+ true,
+ true,
+ Nothing(),
+ {"c0nsumer", "consumer-1", "consumer-2"});
NACLib::TDiffACL acl;
acl.AddAccess(NACLib::EAccessType::Allow, NACLib::DescribeSchema, AUTH_TOKEN);
@@ -164,7 +172,8 @@ protected:
stub = CreateTopicServiceTxStub(*server);
}
- Ydb::Topic::AddOffsetsToTransactionResponse CallAddOffsetsToTransaction(const TVector<TTopic>& topics) {
+ Ydb::Topic::AddOffsetsToTransactionResponse CallAddOffsetsToTransaction(const TVector<TTopic>& topics,
+ const TString& consumer = "c0nsumer") {
grpc::ClientContext rcontext;
rcontext.AddMetadata("x-ydb-auth-ticket", AUTH_TOKEN);
rcontext.AddMetadata("x-ydb-database", DATABASE);
@@ -172,7 +181,8 @@ protected:
Ydb::Topic::AddOffsetsToTransactionResponse response;
grpc::Status status = stub->AddOffsetsToTransaction(&rcontext,
- CreateRequest(session->GetId(), tx->GetId(), topics),
+ CreateRequest(session->GetId(), tx->GetId(),
+ consumer, topics),
&response);
UNIT_ASSERT(status.ok());
@@ -204,7 +214,7 @@ protected:
}
};
-Y_UNIT_TEST_F(TheRangesDoNotOverlap, TAddOffsetToTransactionFixture) {
+Y_UNIT_TEST_F(OneConsumer_TheRangesDoNotOverlap, TAddOffsetToTransactionFixture) {
Ydb::Topic::AddOffsetsToTransactionResponse response = CallAddOffsetsToTransaction({
TTopic{.Path=VALID_TOPIC_PATH, .Partitions={
TPartition{.Id=4, .Offsets={
@@ -228,7 +238,7 @@ Y_UNIT_TEST_F(TheRangesDoNotOverlap, TAddOffsetToTransactionFixture) {
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
}
-Y_UNIT_TEST_F(TheRangesOverlap, TAddOffsetToTransactionFixture) {
+Y_UNIT_TEST_F(OneConsumer_TheRangesOverlap, TAddOffsetToTransactionFixture) {
Ydb::Topic::AddOffsetsToTransactionResponse response = CallAddOffsetsToTransaction({
TTopic{.Path=VALID_TOPIC_PATH, .Partitions={
TPartition{.Id=4, .Offsets={
@@ -252,6 +262,41 @@ Y_UNIT_TEST_F(TheRangesOverlap, TAddOffsetToTransactionFixture) {
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::BAD_REQUEST);
}
+Y_UNIT_TEST_F(DifferentConsumers_TheRangesOverlap, TAddOffsetToTransactionFixture) {
+ Ydb::Topic::AddOffsetsToTransactionResponse response = CallAddOffsetsToTransaction({
+ TTopic{.Path=VALID_TOPIC_PATH, .Partitions={
+ TPartition{.Id=4, .Offsets={
+ TOffsetRange{.Begin=1, .End=3},
+ TOffsetRange{.Begin=5, .End=8}
+ }},
+ TPartition{.Id=1, .Offsets={
+ TOffsetRange{.Begin=2, .End=6}
+ }}
+ }}
+ }, "consumer-1");
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
+
+ response = CallAddOffsetsToTransaction({
+ TTopic{.Path=VALID_TOPIC_PATH, .Partitions={
+ TPartition{.Id=4, .Offsets={
+ TOffsetRange{.Begin=4, .End=7}
+ }}
+ }}
+ }, "consumer-2");
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
+}
+
+Y_UNIT_TEST_F(UnknownConsumer, TAddOffsetToTransactionFixture) {
+ auto response = CallAddOffsetsToTransaction({
+ TTopic{.Path=VALID_TOPIC_PATH, .Partitions={
+ TPartition{.Id=4, .Offsets={
+ TOffsetRange{.Begin=4, .End=7}
+ }}
+ }}
+ }, "unknown-consumer");
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::BAD_REQUEST);
+}
+
Y_UNIT_TEST_F(UnknownTopic, TAddOffsetToTransactionFixture) {
auto response = CallAddOffsetsToTransaction({
TTopic{.Path=INVALID_TOPIC_PATH, .Partitions={
@@ -295,6 +340,59 @@ Y_UNIT_TEST_F(AccessRights, TAddOffsetToTransactionFixture) {
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::UNAUTHORIZED);
}
+Y_UNIT_TEST_F(ThereAreGapsInTheOffsetRanges, TAddOffsetToTransactionFixture) {
+ auto response = CallAddOffsetsToTransaction({
+ TTopic{.Path=VALID_TOPIC_PATH, .Partitions={
+ TPartition{.Id=1, .Offsets={
+ TOffsetRange{.Begin=0, .End=2},
+ TOffsetRange{.Begin=4, .End=6}
+ }}
+ }}
+ });
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
+
+ auto result = tx->Commit().ExtractValueSync();
+ UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::ABORTED);
+}
+
+Y_UNIT_TEST_F(OnePartitionAndNoGapsInTheOffsets, TAddOffsetToTransactionFixture) {
+ auto response = CallAddOffsetsToTransaction({
+ TTopic{.Path=VALID_TOPIC_PATH, .Partitions={
+ TPartition{.Id=1, .Offsets={
+ TOffsetRange{.Begin=0, .End=2}
+ }}
+ }}
+ });
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
+
+ Cerr << "<<< CommitTx <<<" << Endl;
+ auto result = tx->Commit().ExtractValueSync();
+ Cerr << ">>> CommitTx >>>" << Endl;
+ UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::ABORTED);
+}
+
+Y_UNIT_TEST_F(MultiplePartitionsAndNoGapsInTheOffsets, TAddOffsetToTransactionFixture) {
+ auto response = CallAddOffsetsToTransaction({
+ TTopic{.Path=VALID_TOPIC_PATH, .Partitions={
+ TPartition{.Id=1, .Offsets={
+ TOffsetRange{.Begin=0, .End=2}
+ }},
+ TPartition{.Id=2, .Offsets={
+ TOffsetRange{.Begin=0, .End=4}
+ }}
+ }}
+ });
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
+
+ Cerr << "<<< CommitTx <<<" << Endl;
+ auto result = tx->Commit().ExtractValueSync();
+ Cerr << ">>> CommitTx >>>" << Endl;
+ UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::ABORTED);
+}
+
}
}