aboutsummaryrefslogtreecommitdiffstats
path: root/kikimr/persqueue/sdk/deprecated/cpp/v2/ut_utils/data_writer.cpp
diff options
context:
space:
mode:
authorkomels <komels@yandex-team.ru>2022-04-14 13:10:53 +0300
committerkomels <komels@yandex-team.ru>2022-04-14 13:10:53 +0300
commit21c9b0e6b039e9765eb414c406c2b86e8cea6850 (patch)
treef40ebc18ff8958dfbd189954ad024043ca983ea5 /kikimr/persqueue/sdk/deprecated/cpp/v2/ut_utils/data_writer.cpp
parent9a4effa852abe489707139c2b260dccc6f4f9aa9 (diff)
downloadydb-21c9b0e6b039e9765eb414c406c2b86e8cea6850.tar.gz
Final part on compatibility layer: LOGBROKER-7215
ref:777c67aadbf705d19034a09a792b2df61ba53697
Diffstat (limited to 'kikimr/persqueue/sdk/deprecated/cpp/v2/ut_utils/data_writer.cpp')
-rw-r--r--kikimr/persqueue/sdk/deprecated/cpp/v2/ut_utils/data_writer.cpp263
1 files changed, 263 insertions, 0 deletions
diff --git a/kikimr/persqueue/sdk/deprecated/cpp/v2/ut_utils/data_writer.cpp b/kikimr/persqueue/sdk/deprecated/cpp/v2/ut_utils/data_writer.cpp
new file mode 100644
index 0000000000..6f201f39f5
--- /dev/null
+++ b/kikimr/persqueue/sdk/deprecated/cpp/v2/ut_utils/data_writer.cpp
@@ -0,0 +1,263 @@
+#include "data_writer.h"
+
+namespace NPersQueue::NTests {
+using namespace NKikimr;
+
+void TPQDataWriter::Read(
+ const TString& topic, const TString& clientId, const TString& ticket, bool error, bool checkACL,
+ bool useBatching, bool onlyCreate
+) {
+
+ grpc::ClientContext context;
+
+ auto stream = StubP_->ReadSession(&context);
+ UNIT_ASSERT(stream);
+
+ // Send initial request.
+ TReadRequest req;
+ TReadResponse resp;
+
+ req.MutableInit()->AddTopics(topic);
+
+ req.MutableInit()->SetClientId(clientId);
+ req.MutableInit()->SetProxyCookie(ProxyCookie);
+ if (!ticket.empty()) {
+ req.MutableCredentials()->SetTvmServiceTicket(ticket);
+ }
+
+ if (useBatching) {
+ req.MutableInit()->SetProtocolVersion(TReadRequest::Batching);
+ }
+
+ if (!stream->Write(req)) {
+ ythrow yexception() << "write fail";
+ }
+ //TODO[komels]: why this leads to timeout?
+ //Server.AnnoyingClient->GetClientInfo({topic}, clientId, true);
+ UNIT_ASSERT(stream->Read(&resp));
+ if (error) {
+ UNIT_ASSERT(resp.HasError());
+ return;
+ }
+ UNIT_ASSERT_C(resp.HasInit(), resp);
+
+ if (onlyCreate)
+ return;
+
+ for (ui32 i = 0; i < 11; ++i) {
+ TReadRequest req;
+
+ req.MutableRead()->SetMaxCount(1);
+
+ if (!stream->Write(req)) {
+ ythrow yexception() << "write fail";
+ }
+ Server.AnnoyingClient->AlterTopic(FullTopicName, i < 10 ? 2 : 3);
+
+ }
+
+ if (checkACL) {
+ NACLib::TDiffACL acl;
+ acl.RemoveAccess(NACLib::EAccessType::Allow, NACLib::SelectRow, clientId + "@" BUILTIN_ACL_DOMAIN);
+ Server.AnnoyingClient->ModifyACL("/Root/PQ", FullTopicName, acl.SerializeAsString());
+
+ TReadRequest req;
+ req.MutableRead()->SetMaxCount(1);
+ if (!stream->Write(req)) {
+ ythrow yexception() << "write fail";
+ }
+
+ UNIT_ASSERT(stream->Read(&resp));
+ UNIT_ASSERT(resp.HasError() && resp.GetError().GetCode() == NPersQueue::NErrorCode::EErrorCode::ACCESS_DENIED);
+ return;
+ }
+ Server.AnnoyingClient->GetClientInfo({FullTopicName}, clientId, true);
+ for (ui32 i = 0; i < 11; ++i) {
+ TReadResponse resp;
+
+ UNIT_ASSERT(stream->Read(&resp));
+
+ if (useBatching) {
+ UNIT_ASSERT(resp.HasBatchedData());
+ UNIT_ASSERT_VALUES_EQUAL(resp.GetBatchedData().PartitionDataSize(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(resp.GetBatchedData().GetPartitionData(0).BatchSize(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(resp.GetBatchedData().GetPartitionData(0).GetBatch(0).MessageDataSize(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(resp.GetBatchedData().GetPartitionData(0).GetBatch(0).GetMessageData(0).GetOffset(), i);
+ } else {
+ UNIT_ASSERT(resp.HasData());
+ UNIT_ASSERT_VALUES_EQUAL(resp.GetData().MessageBatchSize(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(resp.GetData().GetMessageBatch(0).MessageSize(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(resp.GetData().GetMessageBatch(0).GetMessage(0).GetOffset(), i);
+ }
+ }
+ //TODO: check here that read will never done UNIT_ASSERT(!stream->Read(&resp));
+ {
+ for (ui32 i = 1; i < 11; ++i) {
+ TReadRequest req;
+
+ req.MutableCommit()->AddCookie(i);
+
+ if (!stream->Write(req)) {
+ ythrow yexception() << "write fail";
+ }
+ }
+ ui32 i = 1;
+ while (i <= 10) {
+ TReadResponse resp;
+
+ UNIT_ASSERT(stream->Read(&resp));
+ Cerr << resp << "\n";
+ UNIT_ASSERT(resp.HasCommit());
+ UNIT_ASSERT(resp.GetCommit().CookieSize() > 0);
+ for (ui32 j = 0; j < resp.GetCommit().CookieSize(); ++j) {
+ UNIT_ASSERT( resp.GetCommit().GetCookie(j) == i);
+ ++i;
+ UNIT_ASSERT(i <= 11);
+ }
+ }
+ Server.AnnoyingClient->GetClientInfo({FullTopicName}, clientId, true);
+ }
+}
+
+void TPQDataWriter::WaitWritePQServiceInitialization() {
+ TWriteRequest req;
+ TWriteResponse resp;
+ while (true) {
+ grpc::ClientContext context;
+
+ auto stream = StubP_->WriteSession(&context);
+ UNIT_ASSERT(stream);
+ req.MutableInit()->SetTopic(ShortTopicName);
+ req.MutableInit()->SetSourceId("12345678");
+ req.MutableInit()->SetProxyCookie(ProxyCookie);
+
+ if (!stream->Write(req)) {
+ continue;
+ }
+ UNIT_ASSERT(stream->Read(&resp));
+ if (resp.GetError().GetCode() == NPersQueue::NErrorCode::INITIALIZING) {
+ Sleep(TDuration::MilliSeconds(50));
+ } else {
+ break;
+ }
+ }
+}
+
+ui32 TPQDataWriter::InitSession(const TString& sourceId, ui32 pg, bool success, ui32 step) {
+ TWriteRequest req;
+ TWriteResponse resp;
+
+ grpc::ClientContext context;
+
+ auto stream = StubP_->WriteSession(&context);
+
+ UNIT_ASSERT(stream);
+ req.MutableInit()->SetTopic(ShortTopicName);
+ req.MutableInit()->SetSourceId(sourceId);
+ req.MutableInit()->SetPartitionGroup(pg);
+ req.MutableInit()->SetProxyCookie(ProxyCookie);
+
+ UNIT_ASSERT(stream->Write(req));
+ UNIT_ASSERT(stream->Read(&resp));
+ Cerr << "Init result: " << resp << "\n";
+ //TODO: ensure topic creation - proxy already knows about new partitions, but tablet - no!
+ if (!success) {
+ UNIT_ASSERT(resp.HasError());
+ return 0;
+ } else {
+ if (!resp.HasInit() && step < 5) {
+ Sleep(TDuration::MilliSeconds(100));
+ return InitSession(sourceId, pg, success, step + 1);
+ }
+ UNIT_ASSERT(resp.HasInit());
+ return resp.GetInit().GetPartition();
+ }
+ return 0;
+}
+
+ui32 TPQDataWriter::WriteImpl(
+ const TString& topic, const TVector<TString>& data, bool error, const TString& ticket, bool batch
+) {
+ grpc::ClientContext context;
+
+ auto stream = StubP_->WriteSession(&context);
+ UNIT_ASSERT(stream);
+
+ // Send initial request.
+ TWriteRequest req;
+ TWriteResponse resp;
+
+ req.MutableInit()->SetTopic(topic);
+ req.MutableInit()->SetSourceId(SourceId_);
+ req.MutableInit()->SetProxyCookie(ProxyCookie);
+ if (!ticket.empty())
+ req.MutableCredentials()->SetTvmServiceTicket(ticket);
+ auto item = req.MutableInit()->MutableExtraFields()->AddItems();
+ item->SetKey("key");
+ item->SetValue("value");
+
+ if (!stream->Write(req)) {
+ ythrow yexception() << "write fail";
+ }
+
+ ui32 part = 0;
+
+ UNIT_ASSERT(stream->Read(&resp));
+
+ if (!error) {
+ UNIT_ASSERT_C(resp.HasInit(), resp);
+ UNIT_ASSERT_C(!resp.GetInit().GetSessionId().empty(), resp);
+ part = resp.GetInit().GetPartition();
+ } else {
+ Cerr << resp << "\n";
+ UNIT_ASSERT(resp.HasError());
+ return 0;
+ }
+
+ // Send data requests.
+ Flush(data, stream, ticket, batch);
+
+ Flush(data, stream, ticket, batch);
+
+ Flush(data, stream, ticket, batch);
+
+ Flush(data, stream, ticket, batch);
+
+ //will cause only 4 answers in stream->Read - third call will fail, not blocks
+ stream->WritesDone();
+
+ UNIT_ASSERT(!stream->Read(&resp));
+
+ auto status = stream->Finish();
+ UNIT_ASSERT(status.ok());
+ return part;
+}
+
+ui64 TPQDataWriter::ReadCookieFromMetadata(const std::multimap<grpc::string_ref, grpc::string_ref>& meta) const {
+ auto ci = meta.find("cookie");
+ if (ci == meta.end()) {
+ ythrow yexception() << "server didn't provide the cookie";
+ } else {
+ return FromString<ui64>(TStringBuf(ci->second.data(), ci->second.size()));
+ }
+}
+
+void TPQDataWriter::InitializeChannel() {
+ Channel_ = grpc::CreateChannel("[::1]:" + ToString(Server.GrpcPort), grpc::InsecureChannelCredentials());
+ Stub_ = NKikimrClient::TGRpcServer::NewStub(Channel_);
+
+ grpc::ClientContext context;
+ NKikimrClient::TChooseProxyRequest request;
+ NKikimrClient::TResponse response;
+ auto status = Stub_->ChooseProxy(&context, request, &response);
+ UNIT_ASSERT(status.ok());
+ Cerr << response << "\n";
+ UNIT_ASSERT(response.GetStatus() == NMsgBusProxy::MSTATUS_OK);
+ ProxyCookie = response.GetProxyCookie();
+ Channel_ = grpc::CreateChannel(
+ "[" + response.GetProxyName() + "]:" + ToString(Server.GrpcPort),
+ grpc::InsecureChannelCredentials()
+ );
+ StubP_ = NPersQueue::PersQueueService::NewStub(Channel_);
+}
+} // namespace NKikimr::NPersQueueTests