aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorabcdef <akotov@ydb.tech>2023-07-17 11:13:01 +0300
committerabcdef <akotov@ydb.tech>2023-07-17 11:13:01 +0300
commit9cdf0064f0438ac4c9eb64bcfc707101df43c7ca (patch)
treea2527bc1fdadc7742085218598d00542509e1105
parent7dfa215b0e36a9e0e1558162543aaa81123f9ec0 (diff)
downloadydb-9cdf0064f0438ac4c9eb64bcfc707101df43c7ca.tar.gz
new GetEvents and GetEvent methods
новые методы GetEvents и GetEvent
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp104
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h33
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h54
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/topic.h17
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp322
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/ya.make1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ya.make1
15 files changed, 538 insertions, 2 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.darwin-x86_64.txt
index efe49cc4b5..afd2882e39 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.darwin-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.darwin-x86_64.txt
@@ -29,6 +29,7 @@ target_link_libraries(cpp-client-ydb_topic PUBLIC
api-grpc
api-grpc-draft
api-protos
+ cpp-client-ydb_table
)
target_sources(cpp-client-ydb_topic PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.linux-aarch64.txt
index 3de6140a52..e62148bcc9 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.linux-aarch64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.linux-aarch64.txt
@@ -30,6 +30,7 @@ target_link_libraries(cpp-client-ydb_topic PUBLIC
api-grpc
api-grpc-draft
api-protos
+ cpp-client-ydb_table
)
target_sources(cpp-client-ydb_topic PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.linux-x86_64.txt
index 3de6140a52..e62148bcc9 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.linux-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.linux-x86_64.txt
@@ -30,6 +30,7 @@ target_link_libraries(cpp-client-ydb_topic PUBLIC
api-grpc
api-grpc-draft
api-protos
+ cpp-client-ydb_table
)
target_sources(cpp-client-ydb_topic PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.windows-x86_64.txt
index efe49cc4b5..afd2882e39 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.windows-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_topic/CMakeLists.windows-x86_64.txt
@@ -29,6 +29,7 @@ target_link_libraries(cpp-client-ydb_topic PUBLIC
api-grpc
api-grpc-draft
api-protos
+ cpp-client-ydb_table
)
target_sources(cpp-client-ydb_topic PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp
index 8868748dcc..00520152e0 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp
@@ -125,6 +125,21 @@ TVector<TReadSessionEvent::TEvent> TReadSession::GetEvents(bool block, TMaybe<si
return res;
}
+TVector<TReadSessionEvent::TEvent> TReadSession::GetEvents(const TReadSessionGetEventSettings& settings)
+{
+ auto events = GetEvents(settings.Block_, settings.MaxEventsCount_, settings.MaxByteSize_);
+ if (!events.empty() && settings.Tx_) {
+ auto& tx = settings.Tx_->get();
+ for (auto& event : events) {
+ if (auto* dataEvent = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&event)) {
+ CollectOffsets(tx, *dataEvent);
+ }
+ }
+ UpdateOffsets(tx);
+ }
+ return events;
+}
+
TMaybe<TReadSessionEvent::TEvent> TReadSession::GetEvent(bool block, size_t maxByteSize) {
auto res = EventsQueue->GetEvent(block, maxByteSize);
if (EventsQueue->IsClosed()) {
@@ -133,6 +148,95 @@ TMaybe<TReadSessionEvent::TEvent> TReadSession::GetEvent(bool block, size_t maxB
return res;
}
+TMaybe<TReadSessionEvent::TEvent> TReadSession::GetEvent(const TReadSessionGetEventSettings& settings)
+{
+ auto event = GetEvent(settings.Block_, settings.MaxByteSize_);
+ if (event) {
+ auto& tx = settings.Tx_->get();
+ if (auto* dataEvent = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&*event)) {
+ CollectOffsets(tx, *dataEvent);
+ }
+ UpdateOffsets(tx);
+ }
+ return event;
+}
+
+void TReadSession::CollectOffsets(NTable::TTransaction& tx,
+ const TReadSessionEvent::TDataReceivedEvent& event)
+{
+ const auto& session = *event.GetPartitionSession();
+
+ if (event.HasCompressedMessages()) {
+ for (auto& message : event.GetCompressedMessages()) {
+ CollectOffsets(tx, session.GetTopicPath(), session.GetPartitionId(), message.GetOffset());
+ }
+ } else {
+ for (auto& message : event.GetMessages()) {
+ CollectOffsets(tx, session.GetTopicPath(), session.GetPartitionId(), message.GetOffset());
+ }
+ }
+}
+
+void TReadSession::CollectOffsets(NTable::TTransaction& tx,
+ const TString& topicPath, ui32 partitionId, ui64 offset)
+{
+ const TString& sessionId = tx.GetSession().GetId();
+ const TString& txId = tx.GetId();
+ TOffsetRanges& ranges = OffsetRanges[std::make_pair(sessionId, txId)];
+ ranges[topicPath][partitionId].InsertInterval(offset, offset + 1);
+}
+
+void TReadSession::UpdateOffsets(const NTable::TTransaction& tx)
+{
+ const TString& sessionId = tx.GetSession().GetId();
+ const TString& txId = tx.GetId();
+
+ auto p = OffsetRanges.find(std::make_pair(sessionId, txId));
+ if (p == OffsetRanges.end()) {
+ return;
+ }
+
+ TVector<TTopicOffsets> topics;
+ for (auto& [path, partitions] : p->second) {
+ TTopicOffsets topic;
+ topic.Path = path;
+
+ topics.push_back(std::move(topic));
+
+ for (auto& [id, ranges] : partitions) {
+ TPartitionOffsets partition;
+ partition.PartitionId = id;
+
+ TTopicOffsets& t = topics.back();
+ t.Partitions.push_back(std::move(partition));
+
+ for (auto& range : ranges) {
+ TPartitionOffsets& p = t.Partitions.back();
+
+ TOffsetsRange r;
+ r.Start = range.first;
+ r.End = range.second;
+
+ p.Offsets.push_back(r);
+ }
+ }
+ }
+
+ Y_VERIFY(!topics.empty());
+
+ auto result = Client->UpdateOffsetsInTransaction(tx,
+ topics,
+ Settings.ConsumerName_,
+ {}).GetValueSync();
+ Y_VERIFY(!result.IsTransportError());
+
+ if (!result.IsSuccess()) {
+ ythrow yexception() << "error on update offsets: " << result;
+ }
+
+ OffsetRanges.erase(std::make_pair(sessionId, txId));
+}
+
bool TReadSession::Close(TDuration timeout) {
LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Closing read session. Close timeout: " << timeout);
// Log final counters.
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h
index e49bc9bdd6..064a395a69 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h
@@ -31,6 +31,12 @@ public:
return {};
}
+ TVector<TReadSessionEvent::TEvent> GetEvents(const TReadSessionGetEventSettings& settings) override {
+ return GetEvents(settings.Block_,
+ settings.MaxEventsCount_,
+ settings.MaxByteSize_);
+ }
+
inline TMaybe<TReadSessionEvent::TEvent> GetEvent(bool block, size_t maxByteSize) override {
Y_VERIFY(false);
@@ -39,6 +45,11 @@ public:
return {};
}
+ TMaybe<TReadSessionEvent::TEvent> GetEvent(const TReadSessionGetEventSettings& settings) override {
+ return GetEvent(settings.Block_,
+ settings.MaxByteSize_);
+ }
+
inline bool Close(TDuration timeout) override {
Y_VERIFY(false);
@@ -72,8 +83,13 @@ public:
void Start();
NThreading::TFuture<void> WaitEvent() override;
- TVector<TReadSessionEvent::TEvent> GetEvents(bool block, TMaybe<size_t> maxEventsCount, size_t maxByteSize) override;
- TMaybe<TReadSessionEvent::TEvent> GetEvent(bool block, size_t maxByteSize) override;
+ TVector<TReadSessionEvent::TEvent> GetEvents(bool block,
+ TMaybe<size_t> maxEventsCount,
+ size_t maxByteSize) override;
+ TVector<TReadSessionEvent::TEvent> GetEvents(const TReadSessionGetEventSettings& settings) override;
+ TMaybe<TReadSessionEvent::TEvent> GetEvent(bool block,
+ size_t maxByteSize) override;
+ TMaybe<TReadSessionEvent::TEvent> GetEvent(const TReadSessionGetEventSettings& settings) override;
bool Close(TDuration timeout) override;
@@ -113,6 +129,19 @@ private:
void AbortImpl(EStatus statusCode, const TString& message, NPersQueue::TDeferredActions<false>& deferred);
private:
+ using TOffsetRanges = THashMap<TString, THashMap<ui64, TDisjointIntervalTree<ui64>>>;
+
+ void CollectOffsets(NTable::TTransaction& tx,
+ const TReadSessionEvent::TDataReceivedEvent& event);
+ void CollectOffsets(NTable::TTransaction& tx,
+ const TString& topicPath, ui32 partitionId, ui64 offset);
+ void UpdateOffsets(const NTable::TTransaction& tx);
+
+ //
+ // (session, tx) -> topic -> partition -> (begin, end)
+ //
+ THashMap<std::pair<TString, TString>, TOffsetRanges> OffsetRanges;
+
TReadSessionSettings Settings;
const TString SessionId;
const TInstant StartSessionTime = TInstant::Now();
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h
index d893dc4fe2..5c42758df9 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h
@@ -14,6 +14,25 @@
namespace NYdb::NTopic {
+struct TOffsetsRange {
+ ui64 Start;
+ ui64 End;
+};
+
+struct TPartitionOffsets {
+ ui64 PartitionId;
+ TVector<TOffsetsRange> Offsets;
+};
+
+struct TTopicOffsets {
+ TString Path;
+ TVector<TPartitionOffsets> Partitions;
+};
+
+struct TUpdateOffsetsInTransactionSettings : public TOperationRequestSettings<TUpdateOffsetsInTransactionSettings> {
+ using TOperationRequestSettings<TUpdateOffsetsInTransactionSettings>::TOperationRequestSettings;
+};
+
class TTopicClient::TImpl : public TClientImplCommon<TTopicClient::TImpl> {
public:
// Constructor for main client.
@@ -246,6 +265,41 @@ public:
TRpcRequestSettings::Make(settings));
}
+ TAsyncStatus UpdateOffsetsInTransaction(const NTable::TTransaction& tx,
+ const TVector<TTopicOffsets>& topics,
+ const TString& consumerName,
+ const TUpdateOffsetsInTransactionSettings& settings)
+ {
+ auto request = MakeOperationRequest<Ydb::Topic::UpdateOffsetsInTransactionRequest>(settings);
+
+ request.mutable_tx()->set_id(tx.GetId());
+ request.mutable_tx()->set_session(tx.GetSession().GetId());
+
+ for (auto& t : topics) {
+ auto* topic = request.mutable_topics()->Add();
+ topic->set_path(t.Path);
+
+ for (auto& p : t.Partitions) {
+ auto* partition = topic->mutable_partitions()->Add();
+ partition->set_partition_id(p.PartitionId);
+
+ for (auto& r : p.Offsets) {
+ auto *range = partition->mutable_partition_offsets()->Add();
+ range->set_start(r.Start);
+ range->set_end(r.End);
+ }
+ }
+ }
+
+ request.set_consumer(consumerName);
+
+ return RunSimple<Ydb::Topic::V1::TopicService, Ydb::Topic::UpdateOffsetsInTransactionRequest, Ydb::Topic::UpdateOffsetsInTransactionResponse>(
+ std::move(request),
+ &Ydb::Topic::V1::TopicService::Stub::AsyncUpdateOffsetsInTransaction,
+ TRpcRequestSettings::Make(settings)
+ );
+ }
+
// Runtime API.
std::shared_ptr<IReadSession> CreateReadSession(const TReadSessionSettings& settings);
std::shared_ptr<ISimpleBlockingWriteSession> CreateSimpleWriteSession(const TWriteSessionSettings& settings);
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
index 751ca9d7cf..6e9874cd6d 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
@@ -24,6 +24,10 @@ namespace NYdb {
namespace NScheme {
struct TPermissions;
}
+
+ namespace NTable {
+ class TTransaction;
+ }
}
namespace NYdb::NTopic {
@@ -1535,6 +1539,15 @@ public:
virtual ~IWriteSession() = default;
};
+struct TReadSessionGetEventSettings : public TCommonClientSettingsBase<TReadSessionGetEventSettings> {
+ using TSelf = TReadSessionGetEventSettings;
+
+ FLUENT_SETTING_DEFAULT(bool, Block, false);
+ FLUENT_SETTING_OPTIONAL(size_t, MaxEventsCount);
+ FLUENT_SETTING_DEFAULT(size_t, MaxByteSize, std::numeric_limits<size_t>::max());
+ FLUENT_SETTING_OPTIONAL(std::reference_wrapper<NTable::TTransaction>, Tx);
+};
+
class IReadSession {
public:
//! Main reader loop.
@@ -1554,10 +1567,14 @@ public:
virtual TVector<TReadSessionEvent::TEvent> GetEvents(bool block = false, TMaybe<size_t> maxEventsCount = Nothing(),
size_t maxByteSize = std::numeric_limits<size_t>::max()) = 0;
+ virtual TVector<TReadSessionEvent::TEvent> GetEvents(const TReadSessionGetEventSettings& settings) = 0;
+
//! Get single event.
virtual TMaybe<TReadSessionEvent::TEvent> GetEvent(bool block = false,
size_t maxByteSize = std::numeric_limits<size_t>::max()) = 0;
+ virtual TMaybe<TReadSessionEvent::TEvent> GetEvent(const TReadSessionGetEventSettings& settings) = 0;
+
//! Close read session.
//! Waits for all commit acknowledgments to arrive.
//! Force close after timeout.
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin-x86_64.txt
index 7d8e23430c..d257f7bc7f 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin-x86_64.txt
@@ -42,6 +42,7 @@ target_sources(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
)
set_property(
TARGET
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt
index 614f64ec48..2f974cb1f0 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt
@@ -45,6 +45,7 @@ target_sources(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
)
set_property(
TARGET
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-x86_64.txt
index eebe4771ce..ac1d1c13bf 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-x86_64.txt
@@ -46,6 +46,7 @@ target_sources(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
)
set_property(
TARGET
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.windows-x86_64.txt
index 701581fd9b..0105455a74 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.windows-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.windows-x86_64.txt
@@ -35,6 +35,7 @@ target_sources(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
)
set_property(
TARGET
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
new file mode 100644
index 0000000000..ce9c3f88ef
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
@@ -0,0 +1,322 @@
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h>
+
+#include <library/cpp/logger/stream.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NYdb::NTopic::NTests {
+
+Y_UNIT_TEST_SUITE(TxUsage) {
+
+NKikimr::Tests::TServerSettings MakeServerSettings()
+{
+ auto settings = PQSettings(0);
+ settings.SetDomainName("Root");
+ settings.SetEnableTopicServiceTx(true);
+ settings.PQConfig.SetTopicsAreFirstClassCitizen(true);
+ settings.PQConfig.SetRoot("/Root");
+ settings.PQConfig.SetDatabase("/Root");
+ return settings;
+}
+
+class TEnvironment {
+public:
+ TEnvironment();
+
+ void CreateTopic(const TString& path, const TString& consumer);
+
+ TString GetEndpoint() const;
+ TString GetTopicPath(const TString& name) const;
+ TString GetTopicParent() const;
+ TString GetDatabase() const;
+
+ const TDriver& GetDriver();
+
+private:
+ TString Database;
+ ::NPersQueue::TTestServer Server;
+ TMaybe<TDriver> Driver;
+};
+
+TEnvironment::TEnvironment() :
+ Database("/Root"),
+ Server(MakeServerSettings(), false)
+{
+ Server.StartServer(true, GetDatabase());
+}
+
+void TEnvironment::CreateTopic(const TString& path, const TString& consumer)
+{
+ NTopic::TTopicClient client(GetDriver());
+
+ NTopic::TCreateTopicSettings topics;
+ NTopic::TConsumerSettings<NTopic::TCreateTopicSettings> consumers(topics, consumer);
+ topics.AppendConsumers(consumers);
+
+ auto status = client.CreateTopic(path, topics).GetValueSync();
+ UNIT_ASSERT(status.IsSuccess());
+}
+
+TString TEnvironment::GetEndpoint() const
+{
+ return "localhost:" + ToString(Server.GrpcPort);
+}
+
+TString TEnvironment::GetTopicPath(const TString& name) const
+{
+ return GetTopicParent() + "/" + name;
+}
+
+TString TEnvironment::GetTopicParent() const
+{
+ return GetDatabase();
+}
+
+TString TEnvironment::GetDatabase() const
+{
+ return Database;
+}
+
+const TDriver& TEnvironment::GetDriver()
+{
+ if (!Driver) {
+ TDriverConfig config;
+ config.SetEndpoint(GetEndpoint());
+ config.SetDatabase(GetDatabase());
+ config.SetAuthToken("root@builtin");
+ config.SetLog(MakeHolder<TStreamLogBackend>(&Cerr));
+
+ Driver.ConstructInPlace(config);
+ }
+
+ return *Driver;
+}
+
+class TFixture : public NUnitTest::TBaseFixture {
+protected:
+ void SetUp(NUnitTest::TTestContext&) override;
+
+ NTable::TSession CreateSession();
+ NTable::TTransaction BeginTx(NTable::TSession& session);
+ void CommitTx(NTable::TTransaction& tx, EStatus status);
+
+ using TTopicReadSession = NTopic::IReadSession;
+ using TTopicReadSessionPtr = std::shared_ptr<TTopicReadSession>;
+
+ TTopicReadSessionPtr CreateReader();
+
+ void StartPartitionSession(TTopicReadSessionPtr reader, NTable::TTransaction& tx, ui64 offset);
+ void StartPartitionSession(TTopicReadSessionPtr reader, ui64 offset);
+
+ void ReadMessage(TTopicReadSessionPtr reader, NTable::TTransaction& tx, ui64 offset);
+
+ void WriteMessage(const TString& data);
+
+private:
+ TString GetTopicPath() const;
+ TString GetTopicName() const;
+ TString GetConsumerName() const;
+ TString GetMessageGroupId() const;
+
+ template<class E>
+ E ReadEvent(TTopicReadSessionPtr reader, NTable::TTransaction& tx);
+ template<class E>
+ E ReadEvent(TTopicReadSessionPtr reader);
+
+ const TDriver& GetDriver();
+
+ std::shared_ptr<TEnvironment> Env;
+ TMaybe<TDriver> Driver;
+};
+
+void TFixture::SetUp(NUnitTest::TTestContext&)
+{
+ Env = std::make_shared<TEnvironment>();
+
+ Env->CreateTopic(GetTopicPath(), GetConsumerName());
+}
+
+NTable::TSession TFixture::CreateSession()
+{
+ NTable::TTableClient client(GetDriver());
+ auto result = client.CreateSession().ExtractValueSync();
+ return result.GetSession();
+}
+
+NTable::TTransaction TFixture::BeginTx(NTable::TSession& session)
+{
+ auto result = session.BeginTransaction().ExtractValueSync();
+ return result.GetTransaction();
+}
+
+void TFixture::CommitTx(NTable::TTransaction& tx, EStatus status)
+{
+ auto result = tx.Commit().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), status);
+}
+
+auto TFixture::CreateReader() -> TTopicReadSessionPtr
+{
+ NTopic::TTopicClient client(GetDriver());
+ TReadSessionSettings options;
+ options.ConsumerName(GetConsumerName());
+ options.AppendTopics(GetTopicPath());
+ return client.CreateReadSession(options);
+}
+
+void TFixture::StartPartitionSession(TTopicReadSessionPtr reader, NTable::TTransaction& tx, ui64 offset)
+{
+ auto event = ReadEvent<NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(reader, tx);
+ UNIT_ASSERT_VALUES_EQUAL(event.GetCommittedOffset(), offset);
+ event.Confirm();
+}
+
+void TFixture::StartPartitionSession(TTopicReadSessionPtr reader, ui64 offset)
+{
+ auto event = ReadEvent<NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(reader);
+ UNIT_ASSERT_VALUES_EQUAL(event.GetCommittedOffset(), offset);
+ event.Confirm();
+}
+
+void TFixture::ReadMessage(TTopicReadSessionPtr reader, NTable::TTransaction& tx, ui64 offset)
+{
+ auto event = ReadEvent<NTopic::TReadSessionEvent::TDataReceivedEvent>(reader, tx);
+ UNIT_ASSERT_VALUES_EQUAL(event.GetMessages()[0].GetOffset(), offset);
+}
+
+template<class E>
+E TFixture::ReadEvent(TTopicReadSessionPtr reader, NTable::TTransaction& tx)
+{
+ NTopic::TReadSessionGetEventSettings options;
+ options.Block(true);
+ options.MaxEventsCount(1);
+ options.Tx(tx);
+
+ auto event = reader->GetEvent(options);
+ UNIT_ASSERT(event);
+
+ auto ev = std::get_if<E>(&*event);
+ UNIT_ASSERT(ev);
+
+ return *ev;
+}
+
+template<class E>
+E TFixture::ReadEvent(TTopicReadSessionPtr reader)
+{
+ auto event = reader->GetEvent(true, 1);
+ UNIT_ASSERT(event);
+
+ auto ev = std::get_if<E>(&*event);
+ UNIT_ASSERT(ev);
+
+ return *ev;
+}
+
+void TFixture::WriteMessage(const TString& data)
+{
+ NTopic::TWriteSessionSettings options;
+ options.Path(GetTopicPath());
+ options.MessageGroupId(GetMessageGroupId());
+
+ NTopic::TTopicClient client(GetDriver());
+ auto session = client.CreateSimpleBlockingWriteSession(options);
+ UNIT_ASSERT(session->Write(data));
+ session->Close();
+}
+
+TString TFixture::GetTopicPath() const
+{
+ return Env->GetTopicPath(GetTopicName());
+}
+
+TString TFixture::GetTopicName() const
+{
+ return "my-topic";
+}
+
+TString TFixture::GetConsumerName() const
+{
+ return "my-consumer";
+}
+
+TString TFixture::GetMessageGroupId() const
+{
+ return "my-message-group";
+}
+
+const TDriver& TFixture::GetDriver()
+{
+ return Env->GetDriver();
+}
+
+Y_UNIT_TEST_F(SessionAbort, TFixture)
+{
+ {
+ auto reader = CreateReader();
+ auto session = CreateSession();
+ auto tx = BeginTx(session);
+
+ StartPartitionSession(reader, tx, 0);
+
+ WriteMessage("message #0");
+ ReadMessage(reader, tx, 0);
+
+ WriteMessage("message #1");
+ ReadMessage(reader, tx, 1);
+ }
+
+ {
+ auto session = CreateSession();
+ auto tx = BeginTx(session);
+ auto reader = CreateReader();
+
+ StartPartitionSession(reader, tx, 0);
+
+ ReadMessage(reader, tx, 0);
+
+ CommitTx(tx, EStatus::SUCCESS);
+ }
+
+ {
+ auto reader = CreateReader();
+
+ StartPartitionSession(reader, 2);
+ }
+}
+
+Y_UNIT_TEST_F(TwoSessionOneConsumer, TFixture)
+{
+ WriteMessage("message #0");
+
+ auto session1 = CreateSession();
+ auto tx1 = BeginTx(session1);
+
+ {
+ auto reader = CreateReader();
+
+ StartPartitionSession(reader, tx1, 0);
+
+ ReadMessage(reader, tx1, 0);
+ }
+
+ auto session2 = CreateSession();
+ auto tx2 = BeginTx(session2);
+
+ {
+ auto reader = CreateReader();
+
+ StartPartitionSession(reader, tx2, 0);
+
+ ReadMessage(reader, tx2, 0);
+ }
+
+ CommitTx(tx2, EStatus::SUCCESS);
+ CommitTx(tx1, EStatus::ABORTED);
+}
+
+}
+
+}
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/ya.make b/ydb/public/sdk/cpp/client/ydb_topic/ut/ya.make
index 98728614f6..b1f2718022 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/ya.make
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ya.make
@@ -33,6 +33,7 @@ SRCS(
describe_topic_ut.cpp
managed_executor.h
managed_executor.cpp
+ topic_to_table_ut.cpp
)
END()
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ya.make b/ydb/public/sdk/cpp/client/ydb_topic/ya.make
index 2c096421b7..190f7ab041 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ya.make
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ya.make
@@ -17,6 +17,7 @@ PEERDIR(
ydb/public/api/grpc
ydb/public/api/grpc/draft
ydb/public/api/protos
+ ydb/public/sdk/cpp/client/ydb_table
)
END()