diff options
| author | Ilnaz Nizametdinov <[email protected]> | 2022-03-27 21:49:53 +0300 | 
|---|---|---|
| committer | Ilnaz Nizametdinov <[email protected]> | 2022-03-27 21:49:53 +0300 | 
| commit | 760769f65e5243098c203338ffbae04aae57d61d (patch) | |
| tree | 36e69b83fe61482d0ce2977d948d59eda4c69921 | |
| parent | eba7afc473d087b21d8bfdfc1398f50fd8452aaf (diff) | |
PQ & YDS readable logs, tests KIKIMR-14198
ref:b4144cad998d0d2bd0f953e0c03a2456fd819164
28 files changed, 594 insertions, 128 deletions
| diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt index 26d76167eb9..b4969035b32 100644 --- a/CMakeLists.darwin.txt +++ b/CMakeLists.darwin.txt @@ -574,6 +574,7 @@ add_subdirectory(ydb/public/lib/deprecated/client)  add_subdirectory(ydb/public/lib/value)  add_subdirectory(ydb/library/yql/dq/actors/compute)  add_subdirectory(ydb/library/yql/dq/tasks) +add_subdirectory(ydb/services/lib/sharding)  add_subdirectory(ydb/core/tx/long_tx_service/public)  add_subdirectory(ydb/core/yq/libs/actors)  add_subdirectory(ydb/core/yq/libs/actors/logging) @@ -752,7 +753,6 @@ add_subdirectory(ydb/services/persqueue_v1)  add_subdirectory(ydb/library/persqueue/tests)  add_subdirectory(ydb/services/lib/actors)  add_subdirectory(ydb/public/lib/jwt) -add_subdirectory(ydb/services/lib/sharding)  add_subdirectory(ydb/core/cms)  add_subdirectory(ydb/core/driver_lib/base_utils)  add_subdirectory(ydb/core/driver_lib/cli_config_base) diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt index a80add5dfcf..3fba0407c0a 100644 --- a/CMakeLists.linux.txt +++ b/CMakeLists.linux.txt @@ -654,6 +654,7 @@ add_subdirectory(ydb/public/lib/deprecated/client)  add_subdirectory(ydb/public/lib/value)  add_subdirectory(ydb/library/yql/dq/actors/compute)  add_subdirectory(ydb/library/yql/dq/tasks) +add_subdirectory(ydb/services/lib/sharding)  add_subdirectory(ydb/core/tx/long_tx_service/public)  add_subdirectory(ydb/core/yq/libs/actors)  add_subdirectory(ydb/core/yq/libs/actors/logging) @@ -832,7 +833,6 @@ add_subdirectory(ydb/services/persqueue_v1)  add_subdirectory(ydb/library/persqueue/tests)  add_subdirectory(ydb/services/lib/actors)  add_subdirectory(ydb/public/lib/jwt) -add_subdirectory(ydb/services/lib/sharding)  add_subdirectory(ydb/core/cms)  add_subdirectory(ydb/core/driver_lib/base_utils)  add_subdirectory(ydb/core/driver_lib/cli_config_base) diff --git a/ydb/core/base/path.h b/ydb/core/base/path.h index c2b93d2bafd..402365d473f 100644 --- a/ydb/core/base/path.h +++ b/ydb/core/base/path.h @@ -30,4 +30,10 @@ TString CombinePath(TIter begin, TIter end, bool canonize = true) {          : path;  } +inline TVector<TString> ChildPath(const TVector<TString>& parentPath, const TString& childName) { +    auto path = parentPath; +    path.push_back(childName); +    return path; +} +  } diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.cpp b/ydb/core/client/server/msgbus_server_pq_metacache.cpp index 6b752a60e81..d27e55b0d0b 100644 --- a/ydb/core/client/server/msgbus_server_pq_metacache.cpp +++ b/ydb/core/client/server/msgbus_server_pq_metacache.cpp @@ -215,11 +215,12 @@ private:      void HandleDescribeTopics(TEvPqNewMetaCache::TEvDescribeTopicsRequest::TPtr& ev, const TActorContext& ctx) {          LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Handle describe topics"); +        const auto& msg = *ev->Get(); -        SendSchemeCacheRequest(ev->Get()->Topics, !ev->Get()->PathPrefix.empty(), false, ev->Get()->SyncVersion, ctx); +        SendSchemeCacheRequest(msg.Topics, !msg.PathPrefix.empty(), false, msg.SyncVersion, msg.ShowPrivate, ctx);          auto inserted = DescribeTopicsWaiters.insert(std::make_pair(                  RequestId, -                TWaiter{ev->Sender, std::move(ev->Get()->Topics)} +                TWaiter{ev->Sender, std::move(msg.Topics)}          )).second;          Y_VERIFY(inserted);      } @@ -252,7 +253,7 @@ private:          }          if (DescribeAllTopicsWaiters.empty()) {              LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Make full list SC request"); -            SendSchemeCacheRequest(CurrentTopics, true, true, false, ctx); +            SendSchemeCacheRequest(CurrentTopics, true, true, false, false, ctx);          }          LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Store waiter");          DescribeAllTopicsWaiters.push(waiter); @@ -262,6 +263,9 @@ private:      void HandleSchemeCacheResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {          auto& result = ev->Get()->Request; +        LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Handle SchemeCache response" +            << ": result# " << result->ToString(*AppData()->TypeRegistry)); +          if (result->Instant == 0) {              for (const auto& entry : result->ResultSet) {                  if (!entry.PQGroupInfo) { @@ -296,10 +300,10 @@ private:          }      } - -    void SendSchemeCacheRequest( -            const TVector<TString>& topics, bool addDefaultPathPrefix, bool isFullListingRequest, bool syncVersion, const TActorContext& ctx -    ) { +    void SendSchemeCacheRequest(const TVector<TString>& topics, +            bool addDefaultPathPrefix, bool isFullListingRequest, bool syncVersion, bool showPrivate, +            const TActorContext& ctx) +    {          auto instant = isFullListingRequest ? 0 : ++RequestId;          auto schemeCacheRequest = MakeHolder<NSchemeCache::TSchemeCacheNavigate>(instant);          for (const auto& path : topics) { @@ -311,7 +315,8 @@ private:              entry.Path.insert(entry.Path.end(), split.begin(), split.end());              entry.SyncVersion = syncVersion; -            entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTopic; +            entry.ShowPrivatePath = showPrivate; +            entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;              schemeCacheRequest->ResultSet.emplace_back(std::move(entry));          }          ctx.Send(SchemeCacheId, new TEvTxProxySchemeCache::TEvNavigateKeySet(schemeCacheRequest.Release())); diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.h b/ydb/core/client/server/msgbus_server_pq_metacache.h index f1d8248ef3f..74c9ac14184 100644 --- a/ydb/core/client/server/msgbus_server_pq_metacache.h +++ b/ydb/core/client/server/msgbus_server_pq_metacache.h @@ -66,18 +66,21 @@ struct TEvPqNewMetaCache {          TString PathPrefix;          TVector<TString> Topics;          bool SyncVersion; +        bool ShowPrivate = false;          TEvDescribeTopicsRequest() = default; -        explicit TEvDescribeTopicsRequest(const TVector<TString>& topics, bool syncVersion = true) +        explicit TEvDescribeTopicsRequest(const TVector<TString>& topics, bool syncVersion = true, bool showPrivate = false)              : Topics(topics)              , SyncVersion(syncVersion) +            , ShowPrivate(showPrivate)          {} -        TEvDescribeTopicsRequest(const TVector<TString>& topics, const TString& pathPrefix, bool syncVersion = true) +        TEvDescribeTopicsRequest(const TVector<TString>& topics, const TString& pathPrefix, bool syncVersion = true, bool showPrivate = false)              : PathPrefix(pathPrefix)              , Topics(topics)              , SyncVersion(syncVersion) +            , ShowPrivate(showPrivate)          {}      }; diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 385b647c5ec..6085b914830 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -1231,6 +1231,7 @@ message TModifyScheme {      optional EOperationType OperationType = 2;      optional bool Internal = 36 [default = false]; // internal operations are not generated directly by the user      optional bool FailOnExist = 50 [default = false]; // as a replacement for TEvModifySchemeTransaction.FailOnExist +    optional bool AllowAccessToPrivatePaths = 53 [default = false];      optional TMkDir MkDir = 3;      optional TTableDescription CreateTable = 4; diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index 7c859274496..e0343e21409 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -691,10 +691,16 @@ message TYdsNextToken {  }  message TYdsShardIterator { +    enum ETopicKind { +        KIND_COMMON = 0; +        KIND_CDC = 1; +    } +      required string StreamName = 1;      required string StreamArn = 2;      required uint32 ShardId = 3;      required uint64 ReadTimestampMs = 4;      required uint32 SequenceNumber = 5;      required uint64 CreationTimestampMs = 6; +    optional ETopicKind Kind = 7;  } diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 9623365974c..9b0c97c7311 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -875,6 +875,11 @@ namespace Tests {          return Runtime->GetAppData().FunctionRegistry;      } +    const NYdb::TDriver& TServer::GetDriver() const { +        Y_VERIFY(Driver); +        return *Driver; +    } +      TServer::~TServer() {          if (Runtime->GetAppData().Mon) {              Runtime->GetAppData().Mon->Stop(); diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h index b3673907ae4..30f3d13c11e 100644 --- a/ydb/core/testlib/test_client.h +++ b/ydb/core/testlib/test_client.h @@ -240,6 +240,7 @@ namespace Tests {          const TServerSettings& GetSettings() const;          const NScheme::TTypeRegistry* GetTypeRegistry();          const NMiniKQL::IFunctionRegistry* GetFunctionRegistry(); +        const NYdb::TDriver& GetDriver() const;          ui32 StaticNodes() const {              return Settings->NodeCount; diff --git a/ydb/core/tx/datashard/CMakeLists.txt b/ydb/core/tx/datashard/CMakeLists.txt index 842cf3224bf..2880f39156e 100644 --- a/ydb/core/tx/datashard/CMakeLists.txt +++ b/ydb/core/tx/datashard/CMakeLists.txt @@ -18,6 +18,7 @@ target_link_libraries(core-tx-datashard PUBLIC    library-cpp-resource    cpp-actors-core    cpp-containers-flat_hash +  cpp-digest-md5    cpp-html-pcdata    library-cpp-json    cpp-json-yson @@ -44,6 +45,7 @@ target_link_libraries(core-tx-datashard PUBLIC    api-protos    lib-deprecated-kicli    dq-actors-compute +  services-lib-sharding  )  target_sources(core-tx-datashard PRIVATE    ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp @@ -251,6 +253,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC    library-cpp-resource    cpp-actors-core    cpp-containers-flat_hash +  cpp-digest-md5    cpp-html-pcdata    library-cpp-json    cpp-json-yson @@ -277,6 +280,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC    api-protos    lib-deprecated-kicli    dq-actors-compute +  services-lib-sharding  )  target_sources(core-tx-datashard.global PRIVATE    ${CMAKE_BINARY_DIR}/ydb/core/tx/datashard/eacf331f0ffc35d4b482f1d15a887d3b.cpp diff --git a/ydb/core/tx/datashard/change_record.cpp b/ydb/core/tx/datashard/change_record.cpp index cce147d67f7..6f0f93e9f52 100644 --- a/ydb/core/tx/datashard/change_record.cpp +++ b/ydb/core/tx/datashard/change_record.cpp @@ -2,6 +2,7 @@  #include "export_common.h"  #include <library/cpp/json/json_reader.h> +#include <library/cpp/json/json_writer.h>  #include <library/cpp/json/yson/json2yson.h>  #include <library/cpp/string_utils/base64/base64.h> @@ -14,34 +15,6 @@  namespace NKikimr {  namespace NDataShard { -i64 TChangeRecord::GetSeqNo() const { -    Y_VERIFY(Order <= Max<i64>()); -    return static_cast<i64>(Order); -} - -TConstArrayRef<TCell> TChangeRecord::GetKey() const { -    if (Key) { -        return *Key; -    } - -    switch (Kind) { -        case EKind::AsyncIndex: -        case EKind::CdcDataChange: { -            NKikimrChangeExchange::TChangeRecord::TDataChange parsed; -            Y_VERIFY(parsed.ParseFromArray(Body.data(), Body.size())); - -            TSerializedCellVec key; -            Y_VERIFY(TSerializedCellVec::TryParse(parsed.GetKey().GetData(), key)); - -            Key.ConstructInPlace(key.GetCells()); -            break; -        } -    } - -    Y_VERIFY(Key); -    return *Key; -} -  void TChangeRecord::SerializeTo(NKikimrChangeExchange::TChangeRecord& record) const {      record.SetOrder(Order);      record.SetGroup(Group); @@ -134,7 +107,7 @@ static NJson::TJsonValue ToJson(const TCell& cell, NScheme::TTypeId type) {  static void SerializeJsonKey(TUserTable::TCPtr schema, NJson::TJsonValue& key,      const NKikimrChangeExchange::TChangeRecord::TDataChange::TSerializedCells& in)  { -    Y_VERIFY(in.TagsSize() != schema->KeyColumnIds.size()); +    Y_VERIFY(in.TagsSize() == schema->KeyColumnIds.size());      for (size_t i = 0; i < schema->KeyColumnIds.size(); ++i) {          Y_VERIFY(in.GetTags(i) == schema->KeyColumnIds.at(i));      } @@ -187,20 +160,25 @@ void TChangeRecord::SerializeTo(NJson::TJsonValue& key, NJson::TJsonValue& value                  SerializeJsonValue(Schema, value["newImage"], body.GetNewImage());              } -            if (!body.HasOldImage() && !body.HasNewImage()) { -                switch (body.GetRowOperationCase()) { -                    case NKikimrChangeExchange::TChangeRecord::TDataChange::kUpsert: +            const auto hasAnyImage = body.HasOldImage() || body.HasNewImage(); +            switch (body.GetRowOperationCase()) { +                case NKikimrChangeExchange::TChangeRecord::TDataChange::kUpsert: +                    value["update"].SetType(NJson::EJsonValueType::JSON_MAP); +                    if (!hasAnyImage) {                          SerializeJsonValue(Schema, value["update"], body.GetUpsert()); -                        break; -                    case NKikimrChangeExchange::TChangeRecord::TDataChange::kReset: +                    } +                    break; +                case NKikimrChangeExchange::TChangeRecord::TDataChange::kReset: +                    value["reset"].SetType(NJson::EJsonValueType::JSON_MAP); +                    if (!hasAnyImage) {                          SerializeJsonValue(Schema, value["reset"], body.GetReset()); -                        break; -                    case NKikimrChangeExchange::TChangeRecord::TDataChange::kErase: -                        value["erase"].SetType(NJson::EJsonValueType::JSON_MAP); -                        break; -                    default: -                        Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase())); -                } +                    } +                    break; +                case NKikimrChangeExchange::TChangeRecord::TDataChange::kErase: +                    value["erase"].SetType(NJson::EJsonValueType::JSON_MAP); +                    break; +                default: +                    Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase()));              }              break; @@ -212,6 +190,62 @@ void TChangeRecord::SerializeTo(NJson::TJsonValue& key, NJson::TJsonValue& value      }  } +TConstArrayRef<TCell> TChangeRecord::GetKey() const { +    if (Key) { +        return *Key; +    } + +    switch (Kind) { +        case EKind::AsyncIndex: +        case EKind::CdcDataChange: { +            NKikimrChangeExchange::TChangeRecord::TDataChange parsed; +            Y_VERIFY(parsed.ParseFromArray(Body.data(), Body.size())); + +            TSerializedCellVec key; +            Y_VERIFY(TSerializedCellVec::TryParse(parsed.GetKey().GetData(), key)); + +            Key.ConstructInPlace(key.GetCells()); +            break; +        } +    } + +    Y_VERIFY(Key); +    return *Key; +} + +i64 TChangeRecord::GetSeqNo() const { +    Y_VERIFY(Order <= Max<i64>()); +    return static_cast<i64>(Order); +} + +TString TChangeRecord::GetPartitionKey() const { +    if (PartitionKey) { +        return *PartitionKey; +    } + +    switch (Kind) { +        case EKind::CdcDataChange: { +            Y_VERIFY(Schema); + +            NKikimrChangeExchange::TChangeRecord::TDataChange body; +            Y_VERIFY(body.ParseFromArray(Body.data(), Body.size())); + +            NJson::TJsonValue key; +            SerializeJsonKey(Schema, key, body.GetKey()); + +            PartitionKey.ConstructInPlace(WriteJson(key, false)); +            break; +        } + +        case EKind::AsyncIndex: { +            Y_FAIL("Not supported"); +        } +    } + +    Y_VERIFY(PartitionKey); +    return *PartitionKey; +} +  TString TChangeRecord::ToString() const {      TString result;      TStringOutput out(result); diff --git a/ydb/core/tx/datashard/change_record.h b/ydb/core/tx/datashard/change_record.h index fae0f793732..1ee4259f1b9 100644 --- a/ydb/core/tx/datashard/change_record.h +++ b/ydb/core/tx/datashard/change_record.h @@ -36,8 +36,6 @@ public:      const TPathId& GetPathId() const { return PathId; }      EKind GetKind() const { return Kind; }      const TString& GetBody() const { return Body; } -    i64 GetSeqNo() const; -    TConstArrayRef<TCell> GetKey() const;      const TPathId& GetTableId() const { return TableId; }      ui64 GetSchemaVersion() const { return SchemaVersion; } @@ -45,6 +43,10 @@ public:      void SerializeTo(NKikimrChangeExchange::TChangeRecord& record) const;      void SerializeTo(NJson::TJsonValue& key, NJson::TJsonValue& value) const; +    TConstArrayRef<TCell> GetKey() const; +    i64 GetSeqNo() const; +    TString GetPartitionKey() const; +      TString ToString() const;      void Out(IOutputStream& out) const; @@ -62,6 +64,7 @@ private:      TUserTable::TCPtr Schema;      mutable TMaybe<TOwnedCellVec> Key; +    mutable TMaybe<TString> PartitionKey;  }; // TChangeRecord diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp index 1bc6796dbf6..bb2d356fb72 100644 --- a/ydb/core/tx/datashard/change_sender_async_index.cpp +++ b/ydb/core/tx/datashard/change_sender_async_index.cpp @@ -613,11 +613,11 @@ class TAsyncIndexChangeSenderMain: public TActorBootstrapped<TAsyncIndexChangeSe          return KeyDesc && KeyDesc->Partitions;      } -    ui64 GetPartitionId(TConstArrayRef<TCell> key) const override { +    ui64 GetPartitionId(const TChangeRecord& record) const override {          Y_VERIFY(KeyDesc);          Y_VERIFY(KeyDesc->Partitions); -        const auto range = TTableRange(key); +        const auto range = TTableRange(record.GetKey());          Y_VERIFY(range.Point);          TVector<TKeyDesc::TPartitionInfo>::const_iterator it = LowerBound( diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp index 3533a4997d1..a526fd7dbe2 100644 --- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp @@ -5,12 +5,14 @@  #include <library/cpp/actors/core/actor_bootstrapped.h>  #include <library/cpp/actors/core/hfunc.h>  #include <library/cpp/actors/core/log.h> +#include <library/cpp/digest/md5/md5.h>  #include <library/cpp/json/json_writer.h>  #include <ydb/core/persqueue/partition_key_range/partition_key_range.h>  #include <ydb/core/persqueue/writer/source_id_encoding.h>  #include <ydb/core/persqueue/writer/writer.h>  #include <ydb/core/protos/grpc_pq_old.pb.h> +#include <ydb/services/lib/sharding/sharding.h>  namespace NKikimr {  namespace NDataShard { @@ -18,7 +20,7 @@ namespace NDataShard {  using namespace NPQ;  class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderPartition> { -    static constexpr auto CodecRaw = 1; +    static constexpr auto CodecRaw = 0;      TStringBuf GetLogPrefix() const {          if (!LogPrefix) { @@ -616,28 +618,43 @@ class TCdcChangeSenderMain: public TActorBootstrapped<TCdcChangeSenderMain>          return KeyDesc && KeyDesc->Partitions;      } -    ui64 GetPartitionId(TConstArrayRef<TCell> key) const override { +    ui64 GetPartitionId(const TChangeRecord& record) const override {          Y_VERIFY(KeyDesc);          Y_VERIFY(KeyDesc->Partitions); -        const auto range = TTableRange(key); -        Y_VERIFY(range.Point); - -        TVector<TKeyDesc::TPartitionInfo>::const_iterator it = LowerBound( -            KeyDesc->Partitions.begin(), KeyDesc->Partitions.end(), true, -            [&](const TKeyDesc::TPartitionInfo& partition, bool) { -                const int compares = CompareBorders<true, false>( -                    partition.EndKeyPrefix.GetCells(), range.From, -                    partition.IsInclusive || partition.IsPoint, -                    range.InclusiveFrom || range.Point, KeyDesc->Schema +        switch (Format) { +            case NKikimrSchemeOp::ECdcStreamFormatProto: { +                const auto range = TTableRange(record.GetKey()); +                Y_VERIFY(range.Point); + +                TVector<TKeyDesc::TPartitionInfo>::const_iterator it = LowerBound( +                    KeyDesc->Partitions.begin(), KeyDesc->Partitions.end(), true, +                    [&](const TKeyDesc::TPartitionInfo& partition, bool) { +                        const int compares = CompareBorders<true, false>( +                            partition.EndKeyPrefix.GetCells(), range.From, +                            partition.IsInclusive || partition.IsPoint, +                            range.InclusiveFrom || range.Point, KeyDesc->Schema +                        ); + +                        return (compares < 0); +                    }                  ); -                return (compares < 0); +                Y_VERIFY(it != KeyDesc->Partitions.end()); +                return it->PartitionId; +            } + +            case NKikimrSchemeOp::ECdcStreamFormatJson: { +                using namespace NKikimr::NDataStreams::V1; +                const auto hashKey = HexBytesToDecimal(MD5::Calc(record.GetPartitionKey())); +                return ShardFromDecimal(hashKey, KeyDesc->Partitions.size());              } -        ); -        Y_VERIFY(it != KeyDesc->Partitions.end()); -        return it->PartitionId; +            default: { +                Y_FAIL_S("Unknown format" +                    << ": format# " << static_cast<int>(Format)); +            } +        }      }      IActor* CreateSender(ui64 partitionId) override { diff --git a/ydb/core/tx/datashard/change_sender_common_ops.cpp b/ydb/core/tx/datashard/change_sender_common_ops.cpp index ee8576248b1..026c4033f28 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.cpp +++ b/ydb/core/tx/datashard/change_sender_common_ops.cpp @@ -115,7 +115,7 @@ void TBaseChangeSender::SendRecords() {      bool needToResolve = false;      while (it != PendingSent.end()) { -        const ui64 partitionId = Resolver->GetPartitionId(it->second.GetKey()); +        const ui64 partitionId = Resolver->GetPartitionId(it->second);          if (!Senders.contains(partitionId)) {              needToResolve = true;              ++it; diff --git a/ydb/core/tx/datashard/change_sender_common_ops.h b/ydb/core/tx/datashard/change_sender_common_ops.h index e807a468aad..efff4a722e0 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.h +++ b/ydb/core/tx/datashard/change_sender_common_ops.h @@ -75,7 +75,7 @@ public:      virtual void Resolve() = 0;      virtual bool IsResolving() const = 0;      virtual bool IsResolved() const = 0; -    virtual ui64 GetPartitionId(TConstArrayRef<TCell> key) const = 0; +    virtual ui64 GetPartitionId(const TChangeRecord& record) const = 0;  };  class TBaseChangeSender: public IChangeSender { diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 3af5b4a2e5c..5c98f3f605e 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -1,5 +1,9 @@  #include "datashard_ut_common.h" +#include <ydb/core/base/path.h> +#include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h> +  #include <util/string/strip.h>  namespace NKikimr { @@ -567,6 +571,314 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) {              opts.SetChangesQueueBytesLimit(1);          });      } -} + +} // AsyncIndexChangeExchange + +Y_UNIT_TEST_SUITE(Cdc) { +    using namespace NYdb::NPersQueue; +    using namespace NYdb::NDataStreams::V1; + +    using TCdcStream = TShardedTableOptions::TCdcStream; + +    template <typename TDerived, typename TClient> +    class TTestEnv { +    public: +        explicit TTestEnv( +            const TShardedTableOptions& tableDesc, +            const TCdcStream& streamDesc, +            const TString& root = "Root", +            const TString& tableName = "Table") +        { +            auto settings = TServerSettings(PortManager.GetPort(2134), {}, DefaultPQConfig()) +                .SetDomainName(root) +                .SetGrpcPort(PortManager.GetPort(2135)); + +            Server = new TServer(settings); +            Server->EnableGRpc(settings.GrpcPort); + +            const auto database = JoinPath({root}); +            auto& runtime = *Server->GetRuntime(); +            EdgeActor = runtime.AllocateEdgeActor(); + +            SetupLogging(runtime); +            InitRoot(Server, EdgeActor); + +            CreateShardedTable(Server, EdgeActor, database, tableName, tableDesc); +            WaitTxNotification(Server, EdgeActor, AsyncAlterAddStream(Server, database, tableName, streamDesc)); + +            Client = TDerived::MakeClient(Server->GetDriver(), database); +        } + +        TServer::TPtr GetServer() { +            UNIT_ASSERT(Server); +            return Server; +        } + +        TClient& GetClient() { +            UNIT_ASSERT(Client); +            return *Client; +        } + +        const TActorId& GetEdgeActor() const { +            return EdgeActor; +        } + +    private: +        static NKikimrPQ::TPQConfig DefaultPQConfig() { +            NKikimrPQ::TPQConfig pqConfig; +            pqConfig.SetEnabled(true); +            pqConfig.SetEnableProtoSourceIdInfo(true); +            pqConfig.SetRoundRobinPartitionMapping(true); +            pqConfig.SetTopicsAreFirstClassCitizen(true); +            pqConfig.SetMaxReadCookies(10); +            pqConfig.AddClientServiceType()->SetName("data-streams"); +            return pqConfig; +        } + +        static void SetupLogging(TTestActorRuntime& runtime) { +            runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG); +            runtime.SetLogPriority(NKikimrServices::PQ_READ_PROXY, NLog::PRI_DEBUG); +            runtime.SetLogPriority(NKikimrServices::PQ_METACACHE, NLog::PRI_DEBUG); +        } + +    private: +        TPortManager PortManager; +        TServer::TPtr Server; +        TActorId EdgeActor; +        THolder<TClient> Client; + +    }; // TTestEnv + +    class TTestPqEnv: public TTestEnv<TTestPqEnv, TPersQueueClient> { +    public: +        using TTestEnv<TTestPqEnv, TPersQueueClient>::TTestEnv; + +        static THolder<TPersQueueClient> MakeClient(const NYdb::TDriver& driver, const TString& database) { +            return MakeHolder<TPersQueueClient>(driver, TPersQueueClientSettings().Database(database)); +        } +    }; + +    class TTestYdsEnv: public TTestEnv<TTestYdsEnv, TDataStreamsClient> { +    public: +        using TTestEnv<TTestYdsEnv, TDataStreamsClient>::TTestEnv; + +        static THolder<TDataStreamsClient> MakeClient(const NYdb::TDriver& driver, const TString& database) { +            return MakeHolder<TDataStreamsClient>(driver, NYdb::TCommonClientSettings().Database(database)); +        } +    };  + +    TShardedTableOptions SimpleTable() { +        return TShardedTableOptions() +            .Columns({ +                {"key", "Uint32", true, false}, +                {"value", "Uint32", false, false}, +            }); +    } + +    TCdcStream KeysOnly(NKikimrSchemeOp::ECdcStreamFormat format) { +        return TCdcStream{ +            .Name = "Stream", +            .Mode = NKikimrSchemeOp::ECdcStreamModeKeysOnly, +            .Format = format, +        }; +    } + +    TCdcStream Updates(NKikimrSchemeOp::ECdcStreamFormat format) { +        return TCdcStream{ +            .Name = "Stream", +            .Mode = NKikimrSchemeOp::ECdcStreamModeUpdate, +            .Format = format, +        }; +    } + +    TCdcStream NewAndOldImages(NKikimrSchemeOp::ECdcStreamFormat format) { +        return TCdcStream{ +            .Name = "Stream", +            .Mode = NKikimrSchemeOp::ECdcStreamModeNewAndOldImages, +            .Format = format, +        }; +    } + +    struct PqRunner { +        static void Run(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc, +                const TVector<TString>& queries, const TVector<std::pair<TString, TString>>& records) +        { +            TTestPqEnv env(tableDesc, streamDesc); + +            for (const auto& query : queries) { +                ExecSQL(env.GetServer(), env.GetEdgeActor(), query); +            } + +            auto& client = env.GetClient(); + +            // add consumer +            { +                auto res = client.AddReadRule("/Root/Table/Stream", +                    TAddReadRuleSettings().ReadRule(TReadRuleSettings().ConsumerName("user"))).ExtractValueSync(); +                UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); +            } + +            // get records +            auto reader = client.CreateReadSession(TReadSessionSettings() +                .AppendTopics(TString("/Root/Table/Stream")) +                .ConsumerName("user") +                .DisableClusterDiscovery(true) +            ); + +            ui32 reads = 0; +            while (reads < records.size()) { +                auto ev = reader->GetEvent(true); +                UNIT_ASSERT(ev); + +                if (auto* data = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&*ev)) { +                    for (const auto& item : data->GetMessages()) { +                        const auto& record = records.at(reads++); +                        UNIT_ASSERT_VALUES_EQUAL(record.first, item.GetPartitionKey()); +                        UNIT_ASSERT_VALUES_EQUAL(record.second, item.GetData()); +                    } +                } else if (auto* create = std::get_if<TReadSessionEvent::TCreatePartitionStreamEvent>(&*ev)) { +                    create->Confirm(); +                } else if (auto* destroy = std::get_if<TReadSessionEvent::TDestroyPartitionStreamEvent>(&*ev)) { +                    destroy->Confirm(); +                } else if (std::get_if<TSessionClosedEvent>(&*ev)) { +                    break; +                } +            } + +            // remove consumer +            { +                auto res = client.RemoveReadRule("/Root/Table/Stream", +                    TRemoveReadRuleSettings().ConsumerName("user")).ExtractValueSync(); +                UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); +            } +        } +    }; + +    struct YdsRunner { +        static void Run(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc, +                const TVector<TString>& queries, const TVector<std::pair<TString, TString>>& records) +        { +            TTestYdsEnv env(tableDesc, streamDesc); + +            for (const auto& query : queries) { +                ExecSQL(env.GetServer(), env.GetEdgeActor(), query); +            } + +            auto& client = env.GetClient(); + +            // add consumer +            { +                auto res = client.RegisterStreamConsumer("/Root/Table/Stream", "user").ExtractValueSync(); +                UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); +            } + +            // get shards +            TString shardId; +            { +                auto res = client.ListShards("/Root/Table/Stream", {}).ExtractValueSync(); +                UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); +                UNIT_ASSERT_VALUES_EQUAL(res.GetResult().shards().size(), 1); +                shardId = res.GetResult().shards().begin()->shard_id(); +            } + +            // get iterator +            TString shardIt; +            { +                auto res = client.GetShardIterator("/Root/Table/Stream", shardId, Ydb::DataStreams::V1::ShardIteratorType::TRIM_HORIZON).ExtractValueSync(); +                UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); +                shardIt = res.GetResult().shard_iterator(); +            } + +            // get records +            { +                auto res = client.GetRecords(shardIt).ExtractValueSync(); +                UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); +                UNIT_ASSERT_VALUES_EQUAL(res.GetResult().records().size(), records.size()); + +                for (ui32 i = 0; i < records.size(); ++i) { +                    const auto& actual = res.GetResult().records().at(i); +                    const auto& expected = records.at(i); +                    UNIT_ASSERT_VALUES_EQUAL(actual.partition_key(), expected.first); +                    UNIT_ASSERT_VALUES_EQUAL(actual.data(), expected.second); +                } +            } + +            // remove consumer +            { +                auto res = client.DeregisterStreamConsumer("/Root/Table/Stream", "user").ExtractValueSync(); +                UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); +            } +        } +    }; + +    #define Y_UNIT_TEST_TWIN(N, VAR1, VAR2)                                                                            \ +        template<typename TRunner> void N(NUnitTest::TTestContext&);                                                   \ +        struct TTestRegistration##N {                                                                                  \ +            TTestRegistration##N() {                                                                                   \ +                TCurrentTest::AddTest(#N "[" #VAR1 "]", static_cast<void (*)(NUnitTest::TTestContext&)>(&N<VAR1>), false); \ +                TCurrentTest::AddTest(#N "[" #VAR2 "]", static_cast<void (*)(NUnitTest::TTestContext&)>(&N<VAR2>), false); \ +            }                                                                                                          \ +        };                                                                                                             \ +        static TTestRegistration##N testRegistration##N;                                                               \ +        template<typename TRunner>                                                                                     \ +        void N(NUnitTest::TTestContext&) + +    Y_UNIT_TEST_TWIN(KeysOnlyLog, PqRunner, YdsRunner) { +        TRunner::Run(SimpleTable(), KeysOnly(NKikimrSchemeOp::ECdcStreamFormatJson), {R"( +            UPSERT INTO `/Root/Table` (key, value) VALUES +            (1, 10), +            (2, 20), +            (3, 30); +        )", R"( +            DELETE FROM `/Root/Table` WHERE key = 1; +        )"}, {  +            {"[1]", R"({"update":{}})"}, +            {"[2]", R"({"update":{}})"}, +            {"[3]", R"({"update":{}})"}, +            {"[1]", R"({"erase":{}})"}, +        }); +    } + +    Y_UNIT_TEST_TWIN(UpdatesLog, PqRunner, YdsRunner) { +        TRunner::Run(SimpleTable(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), {R"( +            UPSERT INTO `/Root/Table` (key, value) VALUES +            (1, 10), +            (2, 20), +            (3, 30); +        )", R"( +            DELETE FROM `/Root/Table` WHERE key = 1; +        )"}, {  +            {"[1]", R"({"update":{"value":10}})"}, +            {"[2]", R"({"update":{"value":20}})"}, +            {"[3]", R"({"update":{"value":30}})"}, +            {"[1]", R"({"erase":{}})"}, +        }); +    } + +    Y_UNIT_TEST_TWIN(NewAndOldImagesLog, PqRunner, YdsRunner) { +        TRunner::Run(SimpleTable(), NewAndOldImages(NKikimrSchemeOp::ECdcStreamFormatJson), {R"( +            UPSERT INTO `/Root/Table` (key, value) VALUES +            (1, 10), +            (2, 20), +            (3, 30); +        )", R"( +            UPSERT INTO `/Root/Table` (key, value) VALUES +            (1, 100), +            (2, 200), +            (3, 300); +        )", R"( +            DELETE FROM `/Root/Table` WHERE key = 1; +        )"}, {  +            {"[1]", R"({"update":{},"newImage":{"value":10}})"}, +            {"[2]", R"({"update":{},"newImage":{"value":20}})"}, +            {"[3]", R"({"update":{},"newImage":{"value":30}})"}, +            {"[1]", R"({"update":{},"newImage":{"value":100},"oldImage":{"value":10}})"}, +            {"[2]", R"({"update":{},"newImage":{"value":200},"oldImage":{"value":20}})"}, +            {"[3]", R"({"update":{},"newImage":{"value":300},"oldImage":{"value":30}})"}, +            {"[1]", R"({"erase":{},"oldImage":{"value":100}})"}, +        }); +    } + +} // Cdc  } // NKikimr diff --git a/ydb/core/tx/datashard/ut_change_exchange/CMakeLists.darwin.txt b/ydb/core/tx/datashard/ut_change_exchange/CMakeLists.darwin.txt index 179d5251dca..8fabff31b6c 100644 --- a/ydb/core/tx/datashard/ut_change_exchange/CMakeLists.darwin.txt +++ b/ydb/core/tx/datashard/ut_change_exchange/CMakeLists.darwin.txt @@ -28,6 +28,8 @@ target_link_libraries(ydb-core-tx-datashard-ut_change_exchange PUBLIC    ydb-core-tx    udf-service-exception_policy    public-lib-yson_value +  cpp-client-ydb_datastreams +  cpp-client-ydb_persqueue_public    cpp-client-ydb_result  )  target_sources(ydb-core-tx-datashard-ut_change_exchange PRIVATE diff --git a/ydb/core/tx/datashard/ut_change_exchange/CMakeLists.linux.txt b/ydb/core/tx/datashard/ut_change_exchange/CMakeLists.linux.txt index abb32cd8af4..267d3ce3fc4 100644 --- a/ydb/core/tx/datashard/ut_change_exchange/CMakeLists.linux.txt +++ b/ydb/core/tx/datashard/ut_change_exchange/CMakeLists.linux.txt @@ -29,6 +29,8 @@ target_link_libraries(ydb-core-tx-datashard-ut_change_exchange PUBLIC    ydb-core-tx    udf-service-exception_policy    public-lib-yson_value +  cpp-client-ydb_datastreams +  cpp-client-ydb_persqueue_public    cpp-client-ydb_result  )  target_sources(ydb-core-tx-datashard-ut_change_exchange PRIVATE diff --git a/ydb/core/tx/scheme_board/cache.cpp b/ydb/core/tx/scheme_board/cache.cpp index 708f62362f4..cf0ffbe726c 100644 --- a/ydb/core/tx/scheme_board/cache.cpp +++ b/ydb/core/tx/scheme_board/cache.cpp @@ -825,8 +825,6 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {                  }              case NKikimrSchemeOp::EPathTypeTableIndex:                  return true; -            case NKikimrSchemeOp::EPathTypeCdcStream: -                return true;              default:                  return false;              } diff --git a/ydb/core/tx/scheme_cache/scheme_cache.cpp b/ydb/core/tx/scheme_cache/scheme_cache.cpp index 5cf9a0b0d5e..2ccc4b31a11 100644 --- a/ydb/core/tx/scheme_cache/scheme_cache.cpp +++ b/ydb/core/tx/scheme_cache/scheme_cache.cpp @@ -72,6 +72,7 @@ TString TSchemeCacheNavigate::ToString(const NScheme::TTypeRegistry& typeRegistr          << " ErrorCount: " << ErrorCount          << " DatabaseName: " << DatabaseName          << " DomainOwnerId: " << DomainOwnerId +        << " Instant: " << Instant          << " ResultSet [" << ResultSetToString(ResultSet, typeRegistry) << "]"      << " }";  } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp index 9263fbc9298..9bf454440d1 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp @@ -421,8 +421,11 @@ public:                  .IsResolved()                  .NotDeleted()                  .IsPQGroup() -                .NotUnderOperation() -                .IsCommonSensePath(); +                .NotUnderOperation(); + +            if (!Transaction.GetAllowAccessToPrivatePaths()) { +                checks.IsCommonSensePath(); +            }              if (!checks) {                  TString explain = TStringBuilder() << "path fail checks" @@ -452,7 +455,6 @@ public:          }          newTabletConfig = tabletConfig; -          TPersQueueGroupInfo::TPtr alterData = ParseParams(context, &newTabletConfig, alter, errStr);          if (!alterData) {              result->SetError(NKikimrScheme::StatusInvalidParameter, errStr); diff --git a/ydb/library/persqueue/topic_parser/topic_parser.h b/ydb/library/persqueue/topic_parser/topic_parser.h index 6dd1cf10e18..758a7e9eb6a 100644 --- a/ydb/library/persqueue/topic_parser/topic_parser.h +++ b/ydb/library/persqueue/topic_parser/topic_parser.h @@ -120,6 +120,10 @@ public:          CHECK_VALID_AND_RETURN(PrimaryFullPath);      } +    void SetPrimaryPath(const TString& path) { +        PrimaryFullPath = path; +    } +      TString GetSecondaryPath() {          Y_FAIL("UNIMPLEMENTED");          CHECK_VALID_AND_RETURN(SecondaryFullPath); diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp index 9987c8d4c02..f7b26e84403 100644 --- a/ydb/services/datastreams/datastreams_proxy.cpp +++ b/ydb/services/datastreams/datastreams_proxy.cpp @@ -285,11 +285,10 @@ namespace NKikimr::NDataStreams::V1 {          const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,          const NKikimrSchemeOp::TDirEntry& selfInfo      ) { -        Y_UNUSED(pqGroupDescription);          Y_UNUSED(selfInfo);          TString error; -        if (!ValidateShardsCount(*GetProtoRequest(), groupConfig, error)) { +        if (!ValidateShardsCount(*GetProtoRequest(), pqGroupDescription, error)) {              return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, error, ctx);          } @@ -326,11 +325,10 @@ namespace NKikimr::NDataStreams::V1 {          const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,          const NKikimrSchemeOp::TDirEntry& selfInfo      ) { -        Y_UNUSED(pqGroupDescription);          Y_UNUSED(selfInfo);          TString error; -        if (!ValidateShardsCount(*GetProtoRequest(), groupConfig, error) +        if (!ValidateShardsCount(*GetProtoRequest(), pqGroupDescription, error)              || !ValidateWriteSpeedLimit(*GetProtoRequest(), error, ctx)              || !ValidateRetentionPeriod(*GetProtoRequest(), groupConfig, Nothing(), error))          { @@ -868,8 +866,8 @@ namespace NKikimr::NDataStreams::V1 {      //----------------------------------------------------------------------------------------- -    class TRegisterStreamConsumerActor : public TUpdateSchemeActor<TRegisterStreamConsumerActor, NKikimr::NGRpcService::TEvDataStreamsRegisterStreamConsumerRequest> { -        using TBase = TUpdateSchemeActor<TRegisterStreamConsumerActor, TEvDataStreamsRegisterStreamConsumerRequest>; +    class TRegisterStreamConsumerActor : public TUpdateSchemeActor<TRegisterStreamConsumerActor, NKikimr::NGRpcService::TEvDataStreamsRegisterStreamConsumerRequest, true> { +        using TBase = TUpdateSchemeActor<TRegisterStreamConsumerActor, TEvDataStreamsRegisterStreamConsumerRequest, true>;      public:          TRegisterStreamConsumerActor(NKikimr::NGRpcService::TEvDataStreamsRegisterStreamConsumerRequest* request); @@ -943,8 +941,8 @@ namespace NKikimr::NDataStreams::V1 {      //----------------------------------------------------------------------------------------- -    class TDeregisterStreamConsumerActor : public TUpdateSchemeActor<TDeregisterStreamConsumerActor, NKikimr::NGRpcService::TEvDataStreamsDeregisterStreamConsumerRequest> { -        using TBase = TUpdateSchemeActor<TDeregisterStreamConsumerActor, TEvDataStreamsDeregisterStreamConsumerRequest>; +    class TDeregisterStreamConsumerActor : public TUpdateSchemeActor<TDeregisterStreamConsumerActor, NKikimr::NGRpcService::TEvDataStreamsDeregisterStreamConsumerRequest, true> { +        using TBase = TUpdateSchemeActor<TDeregisterStreamConsumerActor, TEvDataStreamsDeregisterStreamConsumerRequest, true>;      public:          TDeregisterStreamConsumerActor(NKikimr::NGRpcService::TEvDataStreamsDeregisterStreamConsumerRequest* request); @@ -1101,8 +1099,11 @@ namespace NKikimr::NDataStreams::V1 {              auto partitionId = partition.GetPartitionId();              TString shardName = GetShardName(partitionId);              if (shardName == ShardId) { -                TShardIterator it(StreamName, StreamName, partitionId, ReadTimestampMs, SequenceNumber); -                SendResponse(ctx, it); +                if (topicInfo->ShowPrivatePath) { +                    SendResponse(ctx, TShardIterator::Cdc(StreamName, StreamName, partitionId, ReadTimestampMs, SequenceNumber)); +                } else { +                    SendResponse(ctx, TShardIterator::Common(StreamName, StreamName, partitionId, ReadTimestampMs, SequenceNumber)); +                }                  return;              }          } @@ -1187,7 +1188,7 @@ namespace NKikimr::NDataStreams::V1 {                                    TStringBuilder() << "Limit '" << Limit << "' is out of bounds [1; " << MAX_LIMIT << "]", ctx);          } -        SendDescribeProposeRequest(ctx); +        SendDescribeProposeRequest(ctx, ShardIterator.IsCdcTopic());          Become(&TGetRecordsActor::StateWork);      } @@ -1334,7 +1335,7 @@ namespace NKikimr::NDataStreams::V1 {          TShardIterator shardIterator(ShardIterator.GetStreamName(),                                       ShardIterator.GetStreamArn(),                                       ShardIterator.GetShardId(), -                                     timestamp, seqNo); +                                     timestamp, seqNo, ShardIterator.GetKind());          result.set_next_shard_iterator(shardIterator.Serialize());          result.set_millis_behind_latest(millisBehindLatestMs); diff --git a/ydb/services/datastreams/shard_iterator.h b/ydb/services/datastreams/shard_iterator.h index 7b6ce9c5cd5..626f5110b7c 100644 --- a/ydb/services/datastreams/shard_iterator.h +++ b/ydb/services/datastreams/shard_iterator.h @@ -21,16 +21,29 @@ namespace NKikimr::NDataStreams::V1 {        }        TShardIterator(const TString& streamName, const TString& streamArn, -                     ui32 shardId, ui64 readTimestamp, ui32 sequenceNumber) { +                     ui32 shardId, ui64 readTimestamp, ui32 sequenceNumber, +                     NKikimrPQ::TYdsShardIterator::ETopicKind kind = NKikimrPQ::TYdsShardIterator::KIND_COMMON) {            Proto.SetStreamName(streamName);            Proto.SetStreamArn(streamArn);            Proto.SetShardId(shardId);            Proto.SetReadTimestampMs(readTimestamp);            Proto.SetSequenceNumber(sequenceNumber);            Proto.SetCreationTimestampMs(TInstant::Now().MilliSeconds()); +          Proto.SetKind(kind);            Valid = true;        } +      static TShardIterator Common(const TString& streamName, const TString& streamArn, +                     ui32 shardId, ui64 readTimestamp, ui32 sequenceNumber) { +          return TShardIterator(streamName, streamArn, shardId, readTimestamp, sequenceNumber); +      } + +      static TShardIterator Cdc(const TString& streamName, const TString& streamArn, +                     ui32 shardId, ui64 readTimestamp, ui32 sequenceNumber) { +          return TShardIterator(streamName, streamArn, shardId, readTimestamp, sequenceNumber, +                NKikimrPQ::TYdsShardIterator::KIND_CDC); +      } +        TString Serialize() const {            TString data;            bool result = Proto.SerializeToString(&data); @@ -65,6 +78,14 @@ namespace NKikimr::NDataStreams::V1 {                Proto.GetCreationTimestampMs() < LIFETIME_MS;        } +      NKikimrPQ::TYdsShardIterator::ETopicKind GetKind() const { +          return Proto.GetKind(); +      } + +      bool IsCdcTopic() const { +          return Proto.GetKind() == NKikimrPQ::TYdsShardIterator::KIND_CDC; +      } +        bool IsValid() const {            return Valid;        } diff --git a/ydb/services/lib/actors/pq_schema_actor.h b/ydb/services/lib/actors/pq_schema_actor.h index b056a05e642..0b4214e7c86 100644 --- a/ydb/services/lib/actors/pq_schema_actor.h +++ b/ydb/services/lib/actors/pq_schema_actor.h @@ -58,22 +58,20 @@ namespace NKikimr::NGRpcProxy::V1 {          {          } -        void PrepareTopicPath(const NActors::TActorContext &ctx) { // ToDo !! -            TopicPath = NPersQueue::GetFullTopicPath(ctx, this->Request_->GetDatabaseName(), TopicPath); -        } -          TString GetTopicPath(const NActors::TActorContext& ctx) { -            PrepareTopicPath(ctx); -            return TopicPath; +            auto path = NPersQueue::GetFullTopicPath(ctx, this->Request_->GetDatabaseName(), TopicPath); +            if (PrivateTopicName) { +                path = JoinPath(ChildPath(NKikimr::SplitPath(path), *PrivateTopicName)); +            } +            return path;          }      protected:          // TDerived must implement FillProposeRequest(TEvProposeTransaction&, const TActorContext& ctx, TString workingDir, TString name);          void SendProposeRequest(const NActors::TActorContext &ctx) { -            PrepareTopicPath(ctx);              std::pair <TString, TString> pathPair;              try { -                pathPair = NKikimr::NGRpcService::SplitPath(TopicPath); +                pathPair = NKikimr::NGRpcService::SplitPath(GetTopicPath(ctx));              } catch (const std::exception &ex) {                  this->Request_->RaiseIssue(NYql::ExceptionToIssue(ex));                  return this->ReplyWithResult(Ydb::StatusIds::BAD_REQUEST, ctx); @@ -103,15 +101,15 @@ namespace NKikimr::NGRpcProxy::V1 {              }          } -        void SendDescribeProposeRequest(const NActors::TActorContext& ctx) { -            PrepareTopicPath(ctx); +        void SendDescribeProposeRequest(const NActors::TActorContext& ctx, bool showPrivate = false) {              auto navigateRequest = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();              navigateRequest->DatabaseName = CanonizePath(this->Request_->GetDatabaseName().GetOrElse(""));              NSchemeCache::TSchemeCacheNavigate::TEntry entry; -            entry.Path = NKikimr::SplitPath(TopicPath); +            entry.Path = NKikimr::SplitPath(GetTopicPath(ctx));              entry.SyncVersion = true; -            entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTopic; +            entry.ShowPrivatePath = showPrivate || PrivateTopicName.Defined(); +            entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;              navigateRequest->ResultSet.emplace_back(entry);              if (this->Request_->GetInternalToken().empty()) { @@ -157,7 +155,13 @@ namespace NKikimr::NGRpcProxy::V1 {              switch (response.Status) {              case NSchemeCache::TSchemeCacheNavigate::EStatus::Ok: { -                if (!result->ResultSet.front().PQGroupInfo) { +                if (!response.PQGroupInfo) { +                    if (response.Kind == NSchemeCache::TSchemeCacheNavigate::KindCdcStream) { +                        Y_VERIFY(response.ListNodeEntry->Children.size() == 1); +                        PrivateTopicName = response.ListNodeEntry->Children.at(0).Name; +                        return SendDescribeProposeRequest(ctx); +                    } +                      this->Request_->RaiseIssue(                          FillIssue(                              TStringBuilder() << "path '" << path << "' creation is not completed", @@ -246,12 +250,13 @@ namespace NKikimr::NGRpcProxy::V1 {      private:          bool IsDead = false; -        TString TopicPath; +        const TString TopicPath; +        TMaybe<TString> PrivateTopicName;      };      //----------------------------------------------------------------------------------- -    template<class TDerived, class TRequest> +    template<class TDerived, class TRequest, bool AllowAccessToPrivatePaths = false>      class TUpdateSchemeActor : public TPQGrpcSchemaBase<TDerived, TRequest> {          using TBase = TPQGrpcSchemaBase<TDerived, TRequest>; @@ -269,12 +274,17 @@ namespace NKikimr::NGRpcProxy::V1 {              NKikimrSchemeOp::TModifyScheme& modifyScheme(*proposal.Record.MutableTransaction()->MutableModifyScheme());              modifyScheme.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterPersQueueGroup);              modifyScheme.SetWorkingDir(workingDir); +            modifyScheme.SetAllowAccessToPrivatePaths(AllowAccessToPrivatePaths);              auto* config = modifyScheme.MutableAlterPersQueueGroup();              Y_VERIFY(response.Self);              Y_VERIFY(response.PQGroupInfo);              config->CopyFrom(response.PQGroupInfo->Description); +            // keep previous values or set in ModifyPersqueueConfig +            config->ClearTotalGroupCount(); +            config->MutablePQTabletConfig()->ClearPartitionKeySchema(); +              {                  auto applyIf = modifyScheme.AddApplyIf();                  applyIf->SetPathId(response.Self->Info.GetPathId()); diff --git a/ydb/services/persqueue_v1/grpc_pq_actor.h b/ydb/services/persqueue_v1/grpc_pq_actor.h index 7c11732d8ec..9372254485b 100644 --- a/ydb/services/persqueue_v1/grpc_pq_actor.h +++ b/ydb/services/persqueue_v1/grpc_pq_actor.h @@ -708,7 +708,7 @@ private:      void CloseSession(const TString& errorReason, const PersQueue::ErrorCode::ErrorCode code, const TActorContext& ctx); - +    void DescribeTopics(const NActors::TActorContext& ctx, bool showPrivate = false);      bool ProcessTopicSchemeCacheResponse(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry,                                           THashMap<TString, TTopicHolder>::iterator topicsIter, const TActorContext& ctx);      void HandleClientSchemeCacheResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx); @@ -1163,8 +1163,8 @@ public:  }; -class TAddReadRuleActor : public TUpdateSchemeActor<TAddReadRuleActor, TEvPQAddReadRuleRequest> { -    using TBase = TUpdateSchemeActor<TAddReadRuleActor, TEvPQAddReadRuleRequest>; +class TAddReadRuleActor : public TUpdateSchemeActor<TAddReadRuleActor, TEvPQAddReadRuleRequest, true> { +    using TBase = TUpdateSchemeActor<TAddReadRuleActor, TEvPQAddReadRuleRequest, true>;  public:      TAddReadRuleActor(NKikimr::NGRpcService::TEvPQAddReadRuleRequest *request); @@ -1176,8 +1176,8 @@ public:                                 const NKikimrSchemeOp::TDirEntry& selfInfo);  }; -class TRemoveReadRuleActor : public TUpdateSchemeActor<TRemoveReadRuleActor, TEvPQRemoveReadRuleRequest> { -    using TBase = TUpdateSchemeActor<TRemoveReadRuleActor, TEvPQRemoveReadRuleRequest>; +class TRemoveReadRuleActor : public TUpdateSchemeActor<TRemoveReadRuleActor, TEvPQRemoveReadRuleRequest, true> { +    using TBase = TUpdateSchemeActor<TRemoveReadRuleActor, TEvPQRemoveReadRuleRequest, true>;  public:      TRemoveReadRuleActor(NKikimr::NGRpcService::TEvPQRemoveReadRuleRequest* request); diff --git a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp index d7209ef46bb..d83ae00434d 100644 --- a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp @@ -17,6 +17,7 @@  #include <library/cpp/protobuf/util/repeated_field_utils.h>  #include <library/cpp/containers/disjoint_interval_tree/disjoint_interval_tree.h> +#include <util/string/join.h>  #include <util/string/strip.h>  #include <util/charset/utf8.h> @@ -2630,8 +2631,8 @@ TReadInitAndAuthActor::TReadInitAndAuthActor(      , Counters(counters)      , LocalCluster(localCluster)  { -    for (const auto& t : topics) { -        Topics[t.first].TopicNameConverter = t.second; +    for (const auto& [path, converter] : topics) { +        Topics[path].TopicNameConverter = converter;      }  } @@ -2642,18 +2643,24 @@ TReadInitAndAuthActor::~TReadInitAndAuthActor() = default;  void TReadInitAndAuthActor::Bootstrap(const TActorContext &ctx) {      LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " auth for : " << ClientId);      Become(&TThis::StateFunc); +    DoCheckACL = AppData(ctx)->PQConfig.GetCheckACL() && Token; +    DescribeTopics(ctx); +} + +void TReadInitAndAuthActor::DescribeTopics(const NActors::TActorContext& ctx, bool showPrivate) {      TVector<TString> topicNames; -    for (const auto& topic : Topics) { -        topicNames.emplace_back(topic.second.TopicNameConverter->GetPrimaryPath()); +    for (const auto& [_, holder] : Topics) { +        topicNames.emplace_back(holder.TopicNameConverter->GetPrimaryPath());      } -    DoCheckACL = AppData(ctx)->PQConfig.GetCheckACL() && Token; -    ctx.Send(MetaCacheId, new TEvDescribeTopicsRequest(topicNames, true)); + +    LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " describe topics: " << JoinSeq(", ", topicNames)); +    ctx.Send(MetaCacheId, new TEvDescribeTopicsRequest(topicNames, true, showPrivate));  }  void TReadInitAndAuthActor::Die(const TActorContext& ctx) { -    for (auto& t : Topics) { -        if (t.second.PipeClient) -            NTabletPipe::CloseClient(ctx, t.second.PipeClient); +    for (auto& [_, holder] : Topics) { +        if (holder.PipeClient) +            NTabletPipe::CloseClient(ctx, holder.PipeClient);      }      LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " auth is DEAD"); @@ -2695,12 +2702,26 @@ bool TReadInitAndAuthActor::ProcessTopicSchemeCacheResponse(  void TReadInitAndAuthActor::HandleTopicsDescribeResponse(TEvDescribeTopicsResponse::TPtr& ev, const TActorContext& ctx) {      LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " Handle describe topics response"); + +    bool reDescribe = false;      for (const auto& entry : ev->Get()->Result->ResultSet) { -        auto processResult = ProcessMetaCacheTopicResponse(entry);          auto path = JoinPath(entry.Path);          auto it = Topics.find(path);          Y_VERIFY(it != Topics.end()); +        if (entry.Kind == NSchemeCache::TSchemeCacheNavigate::KindCdcStream) { +            Y_VERIFY(entry.ListNodeEntry->Children.size() == 1); +            const auto& topic = entry.ListNodeEntry->Children.at(0); + +            it->second.TopicNameConverter->SetPrimaryPath(JoinPath(ChildPath(entry.Path, topic.Name))); +            Topics[it->second.TopicNameConverter->GetPrimaryPath()] = it->second; +            Topics.erase(it); + +            reDescribe = true; +            continue; +        } + +        auto processResult = ProcessMetaCacheTopicResponse(entry);          if (processResult.IsFatal) {              Topics.erase(it);              if (Topics.empty()) { @@ -2711,14 +2732,21 @@ void TReadInitAndAuthActor::HandleTopicsDescribeResponse(TEvDescribeTopicsRespon                  continue;              }          } -        if (!ProcessTopicSchemeCacheResponse(entry, it, ctx)) -            return; +        if (!ProcessTopicSchemeCacheResponse(entry, it, ctx)) { +            return; +        }      } +      if (Topics.empty()) {          CloseSession("no topics found", PersQueue::ErrorCode::BAD_REQUEST, ctx);          return;      } + +    if (reDescribe) { +        return DescribeTopics(ctx, true); +    } +      // ToDo[migration] - separate option - ?      bool doCheckClientAcl = DoCheckACL && !AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen();      if (doCheckClientAcl) { @@ -2812,9 +2840,9 @@ bool TReadInitAndAuthActor::CheckACLPermissionsForNavigate(  void TReadInitAndAuthActor::FinishInitialization(const TActorContext& ctx) {      TTopicTabletsPairs res; -    for (auto& t : Topics) { +    for (auto& [_, holder] : Topics) {          res.emplace_back(decltype(res)::value_type({ -            t.second.TopicNameConverter, t.second.TabletID, t.second.CloudId, t.second.DbId, t.second.FolderId +            holder.TopicNameConverter, holder.TabletID, holder.CloudId, holder.DbId, holder.FolderId          }));      }      ctx.Send(ParentId, new TEvPQProxy::TEvAuthResultOk(std::move(res))); | 
