diff options
| author | alexnick <[email protected]> | 2022-08-12 20:07:18 +0300 | 
|---|---|---|
| committer | alexnick <[email protected]> | 2022-08-12 20:07:18 +0300 | 
| commit | cc65f4da6b407e808692063c6ea6a9a910a41166 (patch) | |
| tree | ea7945941cf0f5a272779daea6c1dec76374a076 | |
| parent | 9d8ceae328ea2efda6fe8d5480b434e22ad3ba95 (diff) | |
reduce KQP requests count
+
| -rw-r--r-- | contrib/restricted/boost/hana/CONTRIBUTING.md | 5 | ||||
| -rw-r--r-- | contrib/restricted/boost/hana/README.md | 23 | ||||
| -rw-r--r-- | contrib/restricted/boost/hana/RELEASE_NOTES.md | 2 | ||||
| -rw-r--r-- | ydb/services/persqueue_v1/actors/write_session_actor.cpp | 33 | ||||
| -rw-r--r-- | ydb/services/persqueue_v1/actors/write_session_actor.h | 2 | ||||
| -rw-r--r-- | ydb/services/persqueue_v1/persqueue_ut.cpp | 4 | 
6 files changed, 33 insertions, 36 deletions
| diff --git a/contrib/restricted/boost/hana/CONTRIBUTING.md b/contrib/restricted/boost/hana/CONTRIBUTING.md index 403fae5c603..52d0858a6aa 100644 --- a/contrib/restricted/boost/hana/CONTRIBUTING.md +++ b/contrib/restricted/boost/hana/CONTRIBUTING.md @@ -18,7 +18,10 @@ you contribute:     running the tests:     ```shell -   make check +   mkdir build +   cd build +   cmake .. +   cmake --build . --target check     ```  5. Commit your changes. Your commit message should start with a one line     short description of the modifications, with the details and explanations diff --git a/contrib/restricted/boost/hana/README.md b/contrib/restricted/boost/hana/README.md index b4ef11bb35c..93364118b70 100644 --- a/contrib/restricted/boost/hana/README.md +++ b/contrib/restricted/boost/hana/README.md @@ -171,21 +171,13 @@ Please see [LICENSE.md](LICENSE.md).  ## Releasing -This section acts as a reminder of the few simple steps required to release a -new version of the library. This is only relevant to Hana's developers. To -release a new version of the library, make sure the current version in -`include/boost/hana/version.hpp` matches the release you're about to publish. -Then, create an annotated tag with: -```sh -git tag -a --file=- v<version> <<EOM -...your message here... -EOM -``` - -Then, push the tag and create a new GitHub release pointing to that tag. -Once that is done, bump the version number in `include/boost/hana/version.hpp` -so that it matches the next _planned_ release. Finally, do not forget to update -the [Homebrew formula][] to point to the latest version. +To release a new version of Hana, use the `util/release.sh` script. The script +will merge `develop` to `master`, create a tag on `master` and then bump the +version on `develop`. The tag on `master` will be annotated with the contents +of the `RELEASE_NOTES.md` file. Once the `release.sh` script has been run, the +`master` and `develop` branches should be pushed manually, as well as the tag +that was created on `master`. Finally, create a GitHub release pointing to the +new tag on `master`.  <!-- Links --> @@ -199,4 +191,3 @@ the [Homebrew formula][] to point to the latest version.  [eRuby]: http://en.wikipedia.org/wiki/ERuby  [Hana.docs]: http://boostorg.github.io/hana  [Hana.wiki]: https://github.com/boostorg/hana/wiki -[Homebrew formula]: https://github.com/Homebrew/homebrew-core/blob/master/Formula/hana.rb diff --git a/contrib/restricted/boost/hana/RELEASE_NOTES.md b/contrib/restricted/boost/hana/RELEASE_NOTES.md new file mode 100644 index 00000000000..c10c2a0a258 --- /dev/null +++ b/contrib/restricted/boost/hana/RELEASE_NOTES.md @@ -0,0 +1,2 @@ +Release notes for Hana 1.6.2 +============================ diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.cpp b/ydb/services/persqueue_v1/actors/write_session_actor.cpp index 35e39983a7b..acf3841435f 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.cpp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.cpp @@ -320,13 +320,7 @@ void TWriteSessionActor::Handle(TEvPQProxy::TEvWriteInit::TPtr& ev, const TActor  void TWriteSessionActor::InitAfterDiscovery(const TActorContext& ctx) {      try { -        // "Bad" hash - in legacy mode calculated from full name ("rt3...", not short name); -        // Here we had a bug for all the time being and now have to keep compatibility was invalid hashes -        // Generally GetTopicForSrcIdHash for encoding. Do not copy-paste this line; -        EncodedSourceId = NSourceIdEncoding::EncodeSrcId(FullConverter->GetTopicForSrcId(), SourceId); - -        // Good hash and proper way of calcultion; -        CompatibleHash = NSourceIdEncoding::EncodeSrcId(FullConverter->GetTopicForSrcIdHash(), SourceId).Hash; +        EncodedSourceId = NSourceIdEncoding::EncodeSrcId(FullConverter->GetTopicForSrcIdHash(), SourceId);      } catch (yexception& e) {          CloseSession(TStringBuilder() << "incorrect sourceId \"" << SourceId << "\": " << e.what(),  PersQueue::ErrorCode::BAD_REQUEST, ctx);          return; @@ -465,7 +459,6 @@ void TWriteSessionActor::DiscoverPartition(const NActors::TActorContext& ctx) {          ProceedPartition(partitionId, ctx);          return;      } -    SendSelectPartitionRequest(CompatibleHash, FullConverter->GetClientsideName(), ctx);      SendSelectPartitionRequest(EncodedSourceId.Hash, FullConverter->GetClientsideName(), ctx);      State = ES_WAIT_TABLE_REQUEST_1;  } @@ -475,11 +468,11 @@ void TWriteSessionActor::SendSelectPartitionRequest(ui32 hash, const TString& to      auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();      ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);      ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML); -    ev->Record.MutableRequest()->SetKeepSession(false); +    ev->Record.MutableRequest()->SetKeepSession(true);      ev->Record.MutableRequest()->SetQuery(SelectSourceIdQuery);      ev->Record.MutableRequest()->SetDatabase(NKikimr::NPQ::GetDatabaseFromConfig(AppData(ctx)->PQConfig));      // fill tx settings: set commit tx flag & begin new serializable tx. -    ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true); +    ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(false);      ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();      // keep compiled query in cache.      ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); @@ -552,6 +545,10 @@ void TWriteSessionActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const          SelectSrcIdsInflight--;          auto& t = record.GetResponse().GetResults(0).GetValue().GetStruct(0); +        TxId = record.GetResponse().GetTxMeta().id(); +        KqpSessionId = record.GetResponse().GetSessionId(); +        Y_VERIFY(!TxId.empty()); +          if (t.ListSize() != 0) {              auto& tt = t.GetList(0).GetStruct(0);              if (tt.HasOptional() && tt.GetOptional().HasUint32()) { //already got partition @@ -592,6 +589,7 @@ void TWriteSessionActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const          }          return;      } else if (State == EState::ES_WAIT_TABLE_REQUEST_2) { +          SourceIdUpdatesInflight--;          if (!SourceIdUpdatesInflight) {              LastSourceIdUpdate = ctx.Now(); @@ -620,7 +618,12 @@ THolder<NKqp::TEvKqp::TEvQueryRequest> TWriteSessionActor::MakeUpdateSourceIdMet      ev->Record.MutableRequest()->SetKeepSession(false);      // fill tx settings: set commit tx flag & begin new serializable tx.      ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true); -    ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); +    if (TxId) { +        ev->Record.MutableRequest()->MutableTxControl()->set_tx_id(TxId); +        ev->Record.MutableRequest()->SetSessionId(KqpSessionId); +    } else { +        ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); +    }      // keep compiled query in cache.      ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); @@ -639,12 +642,6 @@ THolder<NKqp::TEvKqp::TEvQueryRequest> TWriteSessionActor::MakeUpdateSourceIdMet  void TWriteSessionActor::SendUpdateSrcIdsRequests(const TActorContext& ctx) {      { -        //full legacy name (rt3.dc--acc--topic) -        auto ev = MakeUpdateSourceIdMetadataRequest(CompatibleHash, FullConverter->GetClientsideName(), ctx); -        SourceIdUpdatesInflight++; -        ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); -    } -    {          auto ev = MakeUpdateSourceIdMetadataRequest(EncodedSourceId.Hash, FullConverter->GetClientsideName(), ctx);          SourceIdUpdatesInflight++;          ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); @@ -667,6 +664,8 @@ void TWriteSessionActor::ProceedPartition(const ui32 partition, const TActorCont      ui64 tabletId = it != PartitionToTablet.end() ? it->second : 0; +    TxId = ""; +      if (!tabletId) {          CloseSession(                  Sprintf("no partition %u in topic '%s', Marker# PQ4", Partition, diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.h b/ydb/services/persqueue_v1/actors/write_session_actor.h index 5a7312f3b08..335b9e91189 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.h +++ b/ydb/services/persqueue_v1/actors/write_session_actor.h @@ -238,6 +238,8 @@ private:      TString ClientDC;      TString SelectSourceIdQuery;      TString UpdateSourceIdQuery; +    TString TxId; +    TString KqpSessionId;      ui32 SelectSrcIdsInflight = 0;      ui64 MaxSrcIdAccessTime = 0; diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 91683d5490b..ad1e4f4d4da 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -4639,12 +4639,12 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {          server.AnnoyingClient->CreateTopic(legacyName, 100);          runTest(legacyName, shortLegacyName, topicName, srcId1, 5, 100); -        runTest(legacyName, legacyName, topicName, srcId2, 6, 100); +        runTest(legacyName, shortLegacyName, topicName, srcId2, 6, 100);          runTest("", "", topicName, srcId1, 5, 100);          runTest("", "", topicName, srcId2, 6, 100);          ui64 time = (TInstant::Now() + TDuration::Hours(4)).MilliSeconds(); -        runTest(legacyName, legacyName, topicName, srcId2, 7, time); +        runTest(legacyName, shortLegacyName, topicName, srcId2, 7, time);      }      Y_UNIT_TEST(TestReadPartitionStatus) { | 
