diff options
author | chertus <azuikov@ydb.tech> | 2023-08-23 19:37:11 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2023-08-23 20:34:13 +0300 |
commit | d326ef1ecaa6f9550e0049ff24a23ca1eded8df8 (patch) | |
tree | 7c82e86992591e87ab3e06578c9fde55e161561e | |
parent | 4a6616e74c1ebb1571d8e1cce45270796b9daa1f (diff) | |
download | ydb-d326ef1ecaa6f9550e0049ff24a23ca1eded8df8.tar.gz |
KIKIMR-19105 proto & C++ API for ClumnTable CreateTable
-rw-r--r-- | ydb/core/grpc_services/rpc_create_table.cpp | 52 | ||||
-rw-r--r-- | ydb/core/ydb_convert/table_description.cpp | 28 | ||||
-rw-r--r-- | ydb/core/ydb_convert/table_description.h | 2 | ||||
-rw-r--r-- | ydb/public/api/protos/ydb_table.proto | 10 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_table/table.cpp | 30 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_table/table.h | 9 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp | 34 |
7 files changed, 158 insertions, 7 deletions
diff --git a/ydb/core/grpc_services/rpc_create_table.cpp b/ydb/core/grpc_services/rpc_create_table.cpp index 857110255e..556004f466 100644 --- a/ydb/core/grpc_services/rpc_create_table.cpp +++ b/ydb/core/grpc_services/rpc_create_table.cpp @@ -101,6 +101,46 @@ private: ); } + bool MakeCreateColumnTable(const Ydb::Table::CreateTableRequest& req, const TString& tableName, + NKikimrSchemeOp::TModifyScheme& schemaProto, + StatusIds::StatusCode& code, NYql::TIssues& issues) { + schemaProto.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnTable); + auto tableDesc = schemaProto.MutableCreateColumnTable(); + tableDesc->SetName(tableName); + + auto schema = tableDesc->MutableSchema(); + schema->SetEngine(NKikimrSchemeOp::EColumnTableEngine::COLUMN_ENGINE_REPLACING_TIMESERIES); + + TString error; + if (!FillColumnDescription(*tableDesc, req.columns(), code, error)) { + issues.AddIssue(NYql::TIssue(error)); + return false; + } + + schema->MutableKeyColumnNames()->CopyFrom(req.primary_key()); + + auto& hashSharding = *tableDesc->MutableSharding()->MutableHashSharding(); + hashSharding.SetFunction(NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_MODULO_N); + + if (req.has_partitioning_settings()) { + auto& partitioningSettings = req.partitioning_settings(); + hashSharding.MutableColumns()->CopyFrom(partitioningSettings.partition_by()); + if (partitioningSettings.min_partitions_count()) { + tableDesc->SetColumnShardCount(partitioningSettings.min_partitions_count()); + } + } + + if (req.has_ttl_settings()) { + if (!FillTtlSettings(*tableDesc->MutableTtlSettings()->MutableEnabled(), req.ttl_settings(), code, error)) { + issues.AddIssue(NYql::TIssue(error)); + return false; + } + } + tableDesc->MutableTtlSettings()->SetUseTiering(req.tiering()); + + return true; + } + void SendProposeRequest(const TActorContext &ctx) { const auto req = GetProtoRequest(); std::pair<TString, TString> pathPair; @@ -129,6 +169,18 @@ private: NKikimrTxUserProxy::TEvProposeTransaction& record = proposeRequest->Record; NKikimrSchemeOp::TModifyScheme* modifyScheme = record.MutableTransaction()->MutableModifyScheme(); modifyScheme->SetWorkingDir(workingDir); + + if (req->store_type() == Ydb::Table::StoreType::STORE_TYPE_COLUMN) { + StatusIds::StatusCode code = StatusIds::SUCCESS; + NYql::TIssues issues; + if (MakeCreateColumnTable(*req, name, *modifyScheme, code, issues)) { + ctx.Send(MakeTxProxyID(), proposeRequest.release()); + } else { + Reply(code, issues, ctx); + } + return; + } + NKikimrSchemeOp::TTableDescription* tableDesc = nullptr; if (req->indexesSize()) { modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpCreateIndexedTable); diff --git a/ydb/core/ydb_convert/table_description.cpp b/ydb/core/ydb_convert/table_description.cpp index 6beb381edd..758c271427 100644 --- a/ydb/core/ydb_convert/table_description.cpp +++ b/ydb/core/ydb_convert/table_description.cpp @@ -188,6 +188,8 @@ void FillColumnDescription(Ydb::Table::DescribeTableResult& out, const NKikimrSc out.set_tiering(in.GetTtlSettings().GetUseTiering()); } } + + out.set_store_type(Ydb::Table::StoreType::STORE_TYPE_COLUMN); } bool ExtractColumnTypeInfo(NScheme::TTypeInfo& outTypeInfo, TString& outTypeMod, @@ -290,6 +292,32 @@ bool FillColumnDescription(NKikimrSchemeOp::TTableDescription& out, return true; } +bool FillColumnDescription(NKikimrSchemeOp::TColumnTableDescription& out, + const google::protobuf::RepeatedPtrField<Ydb::Table::ColumnMeta>& in, Ydb::StatusIds::StatusCode& status, TString& error) { + auto* schema = out.MutableSchema(); + + for (const auto& column : in) { + if (column.type().has_pg_type()) { + status = Ydb::StatusIds::BAD_REQUEST; + error = "Unsupported column type for column: " + column.name(); + return false; + } + + auto* columnDesc = schema->AddColumns(); + columnDesc->SetName(column.name()); + + NScheme::TTypeInfo typeInfo; + TString typeMod; + if (!ExtractColumnTypeInfo(typeInfo, typeMod, column.type(), status, error)) { + return false; + } + columnDesc->SetType(NScheme::TypeName(typeInfo, typeMod)); + columnDesc->SetNotNull(column.not_null()); + } + + return true; +} + template <typename TYdbProto> void FillTableBoundaryImpl(TYdbProto& out, const NKikimrSchemeOp::TTableDescription& in, const NKikimrMiniKQL::TType& splitKeyType) { diff --git a/ydb/core/ydb_convert/table_description.h b/ydb/core/ydb_convert/table_description.h index 7a7be50aa5..d0e140a5f5 100644 --- a/ydb/core/ydb_convert/table_description.h +++ b/ydb/core/ydb_convert/table_description.h @@ -18,6 +18,8 @@ void FillColumnDescription(Ydb::Table::DescribeTableResult& out, const NKikimrSc // in bool FillColumnDescription(NKikimrSchemeOp::TTableDescription& out, const google::protobuf::RepeatedPtrField<Ydb::Table::ColumnMeta>& in, Ydb::StatusIds::StatusCode& status, TString& error); +bool FillColumnDescription(NKikimrSchemeOp::TColumnTableDescription& out, + const google::protobuf::RepeatedPtrField<Ydb::Table::ColumnMeta>& in, Ydb::StatusIds::StatusCode& status, TString& error); bool ExtractColumnTypeInfo(NScheme::TTypeInfo& outTypeInfo, TString& outTypeMod, const Ydb::Type& inType, Ydb::StatusIds::StatusCode& status, TString& error); diff --git a/ydb/public/api/protos/ydb_table.proto b/ydb/public/api/protos/ydb_table.proto index 83cd3d84eb..ebc8a943d3 100644 --- a/ydb/public/api/protos/ydb_table.proto +++ b/ydb/public/api/protos/ydb_table.proto @@ -477,6 +477,12 @@ message ReadReplicasSettings { reserved 3; // cluster_replicas_settings (part of oneof settings) } +enum StoreType { + STORE_TYPE_UNSPECIFIED = 0; + STORE_TYPE_ROW = 1; + STORE_TYPE_COLUMN = 2; +} + message CreateTableRequest { // Session identifier string session_id = 1; @@ -519,6 +525,8 @@ message CreateTableRequest { string tiering = 18; // Is temporary table bool temporary = 19; + // Is table column or row oriented + StoreType store_type = 20; } message CreateTableResponse { @@ -718,6 +726,8 @@ message DescribeTableResult { string tiering = 16; // Is temporary table bool temporary = 17; + // Is table column or row oriented + StoreType store_type = 18; } message Query { diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp index 9af6fc69f2..0e466de52f 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp @@ -306,6 +306,10 @@ class TTableDescription::TImpl { Tiering_ = proto.tiering(); } + if (proto.store_type()) { + StoreType_ = (proto.store_type() == Ydb::Table::STORE_TYPE_COLUMN) ? EStoreType::Column : EStoreType::Row; + } + // column families ColumnFamilies_.reserve(proto.column_families_size()); for (const auto& family : proto.column_families()) { @@ -502,6 +506,10 @@ public: ReadReplicasSettings_ = TReadReplicasSettings(mode, readReplicasCount); } + void SetStoreType(EStoreType type) { + StoreType_ = type; + } + const TVector<TString>& GetPrimaryKeyColumns() const { return PrimaryKey_; } @@ -526,6 +534,10 @@ public: return Tiering_; } + EStoreType GetStoreType() const { + return StoreType_; + } + const TString& GetOwner() const { return Owner_; } @@ -619,6 +631,7 @@ private: TMaybe<TReadReplicasSettings> ReadReplicasSettings_; bool HasStorageSettings_ = false; bool HasPartitioningSettings_ = false; + EStoreType StoreType_ = EStoreType::Row; }; TTableDescription::TTableDescription() @@ -671,6 +684,10 @@ TMaybe<TString> TTableDescription::GetTiering() const { return Impl_->GetTiering(); } +EStoreType TTableDescription::GetStoreType() const { + return Impl_->GetStoreType(); +} + const TString& TTableDescription::GetOwner() const { return Impl_->GetOwner(); } @@ -779,6 +796,10 @@ void TTableDescription::SetReadReplicasSettings(TReadReplicasSettings::EMode mod Impl_->SetReadReplicasSettings(mode, readReplicasCount); } +void TTableDescription::SetStoreType(EStoreType type) { + Impl_->SetStoreType(type); +} + const TVector<TPartitionStats>& TTableDescription::GetPartitionStats() const { return Impl_->GetPartitionStats(); } @@ -858,6 +879,10 @@ void TTableDescription::SerializeTo(Ydb::Table::CreateTableRequest& request) con request.set_tiering(*tiering); } + if (Impl_->GetStoreType() == EStoreType::Column) { + request.set_store_type(Ydb::Table::StoreType::STORE_TYPE_COLUMN); + } + if (Impl_->HasStorageSettings()) { request.mutable_storage_settings()->CopyFrom(Impl_->GetStorageSettings().GetProto()); } @@ -1036,6 +1061,11 @@ TColumnFamilyDescription TColumnFamilyBuilder::Build() const { //////////////////////////////////////////////////////////////////////////////// +TTableBuilder& TTableBuilder::SetStoreType(EStoreType type) { + TableDescription_.SetStoreType(type); + return *this; +} + TTableBuilder& TTableBuilder::AddNullableColumn(const TString& name, const EPrimitiveType& type, const TString& family) { auto columnType = TTypeBuilder() .BeginOptional() diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.h b/ydb/public/sdk/cpp/client/ydb_table/table.h index 629ed20193..4892a3a32b 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.h +++ b/ydb/public/sdk/cpp/client/ydb_table/table.h @@ -455,6 +455,11 @@ private: struct TExplicitPartitions; struct TDescribeTableSettings; +enum class EStoreType { + Row = 0, + Column = 1 +}; + //! Represents table description class TTableDescription { friend class TTableBuilder; @@ -473,6 +478,7 @@ public: TVector<TChangefeedDescription> GetChangefeedDescriptions() const; TMaybe<TTtlSettings> GetTtlSettings() const; TMaybe<TString> GetTiering() const; + EStoreType GetStoreType() const; // Deprecated. Use GetEntry() of TDescribeTableResult instead const TString& GetOwner() const; @@ -554,6 +560,7 @@ private: void SetPartitioningSettings(const TPartitioningSettings& settings); void SetKeyBloomFilter(bool enabled); void SetReadReplicasSettings(TReadReplicasSettings::EMode mode, ui64 readReplicasCount); + void SetStoreType(EStoreType type); const Ydb::Table::DescribeTableResult& GetProto() const; class TImpl; @@ -723,6 +730,8 @@ class TTableBuilder { public: TTableBuilder() = default; + TTableBuilder& SetStoreType(EStoreType type); + TTableBuilder& AddNullableColumn(const TString& name, const EPrimitiveType& type, const TString& family = TString()); TTableBuilder& AddNullableColumn(const TString& name, const TDecimalType& type, const TString& family = TString()); TTableBuilder& AddNullableColumn(const TString& name, const TPgType& type, const TString& family = TString()); diff --git a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp index 4fe5b2e040..10867cf5ec 100644 --- a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp +++ b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp @@ -263,7 +263,7 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) { } } - Y_UNIT_TEST(ParquetImportBug) { + void ParquetImportBug(bool columnTable) { NKikimrConfig::TAppConfig appConfig; TKikimrWithGrpcAndRootSchema server(appConfig); server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); @@ -287,6 +287,8 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) { }; auto tableBuilder = client.GetTableBuilder(); + auto tableType = columnTable ? NYdb::NTable::EStoreType::Column : NYdb::NTable::EStoreType::Row; + tableBuilder.SetStoreType(tableType); for (auto& [name, type] : schema) { if (name == "id") { tableBuilder.AddNonNullableColumn(name, type); @@ -295,11 +297,25 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) { } } tableBuilder.SetPrimaryKeyColumns({"id"}); + if (columnTable) { + NYdb::NTable::TTablePartitioningSettingsBuilder partsBuilder(tableBuilder); + partsBuilder.SetMinPartitionsCount(1); + partsBuilder.EndPartitioningSettings(); + } auto result = session.CreateTable(tablePath, tableBuilder.Build(), {}).ExtractValueSync(); UNIT_ASSERT_EQUAL(result.IsTransportError(), false); UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SUCCESS); + { + auto descResult = session.DescribeTable(tablePath, {}).ExtractValueSync(); + UNIT_ASSERT_EQUAL(descResult.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(descResult.GetStatus(), EStatus::SUCCESS); + + Cerr << "Table type: " << (ui32) descResult.GetTableDescription().GetStoreType() << Endl; + UNIT_ASSERT_EQUAL(descResult.GetTableDescription().GetStoreType(), tableType); + } + auto batchSchema = std::make_shared<arrow::Schema>( std::vector<std::shared_ptr<arrow::Field>>{ arrow::field("id", arrow::uint32()), @@ -343,6 +359,14 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) { UNIT_ASSERT_GT(rows.size(), 0); } } +#if 0 // TODO: KIKIMR-18717 + Y_UNIT_TEST(ParquetImportBug) { + ParquetImportBug(true); + } +#endif + Y_UNIT_TEST(ParquetImportBug_Datashard) { + ParquetImportBug(false); + } Y_UNIT_TEST(UpsertCsvBug) { NKikimrConfig::TAppConfig appConfig; @@ -687,9 +711,7 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) { tableBuilder.AddNullableColumn(name, type); } tableBuilder.SetPrimaryKeyColumns({"timestamp"}); - NYdb::NTable::TCreateTableSettings tableSettings; - //tableSettings.PartitioningPolicy(NYdb::NTable::TPartitioningPolicy().UniformPartitions(2)); - auto result = session.CreateTable(tablePath, tableBuilder.Build(), tableSettings).ExtractValueSync(); + auto result = session.CreateTable(tablePath, tableBuilder.Build(), {}).ExtractValueSync(); UNIT_ASSERT_EQUAL(result.IsTransportError(), false); UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SUCCESS); @@ -796,9 +818,7 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) { tableBuilder.AddNullableColumn(name, type); } tableBuilder.SetPrimaryKeyColumns({"timestamp"}); - NYdb::NTable::TCreateTableSettings tableSettings; - //tableSettings.PartitioningPolicy(NYdb::NTable::TPartitioningPolicy().UniformPartitions(2)); - auto result = session.CreateTable(tablePath, tableBuilder.Build(), tableSettings).ExtractValueSync(); + auto result = session.CreateTable(tablePath, tableBuilder.Build(), {}).ExtractValueSync(); UNIT_ASSERT_EQUAL(result.IsTransportError(), false); UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SUCCESS); |