aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2023-08-23 19:37:11 +0300
committerchertus <azuikov@ydb.tech>2023-08-23 20:34:13 +0300
commitd326ef1ecaa6f9550e0049ff24a23ca1eded8df8 (patch)
tree7c82e86992591e87ab3e06578c9fde55e161561e
parent4a6616e74c1ebb1571d8e1cce45270796b9daa1f (diff)
downloadydb-d326ef1ecaa6f9550e0049ff24a23ca1eded8df8.tar.gz
KIKIMR-19105 proto & C++ API for ClumnTable CreateTable
-rw-r--r--ydb/core/grpc_services/rpc_create_table.cpp52
-rw-r--r--ydb/core/ydb_convert/table_description.cpp28
-rw-r--r--ydb/core/ydb_convert/table_description.h2
-rw-r--r--ydb/public/api/protos/ydb_table.proto10
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/table.cpp30
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/table.h9
-rw-r--r--ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp34
7 files changed, 158 insertions, 7 deletions
diff --git a/ydb/core/grpc_services/rpc_create_table.cpp b/ydb/core/grpc_services/rpc_create_table.cpp
index 857110255e..556004f466 100644
--- a/ydb/core/grpc_services/rpc_create_table.cpp
+++ b/ydb/core/grpc_services/rpc_create_table.cpp
@@ -101,6 +101,46 @@ private:
);
}
+ bool MakeCreateColumnTable(const Ydb::Table::CreateTableRequest& req, const TString& tableName,
+ NKikimrSchemeOp::TModifyScheme& schemaProto,
+ StatusIds::StatusCode& code, NYql::TIssues& issues) {
+ schemaProto.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnTable);
+ auto tableDesc = schemaProto.MutableCreateColumnTable();
+ tableDesc->SetName(tableName);
+
+ auto schema = tableDesc->MutableSchema();
+ schema->SetEngine(NKikimrSchemeOp::EColumnTableEngine::COLUMN_ENGINE_REPLACING_TIMESERIES);
+
+ TString error;
+ if (!FillColumnDescription(*tableDesc, req.columns(), code, error)) {
+ issues.AddIssue(NYql::TIssue(error));
+ return false;
+ }
+
+ schema->MutableKeyColumnNames()->CopyFrom(req.primary_key());
+
+ auto& hashSharding = *tableDesc->MutableSharding()->MutableHashSharding();
+ hashSharding.SetFunction(NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_MODULO_N);
+
+ if (req.has_partitioning_settings()) {
+ auto& partitioningSettings = req.partitioning_settings();
+ hashSharding.MutableColumns()->CopyFrom(partitioningSettings.partition_by());
+ if (partitioningSettings.min_partitions_count()) {
+ tableDesc->SetColumnShardCount(partitioningSettings.min_partitions_count());
+ }
+ }
+
+ if (req.has_ttl_settings()) {
+ if (!FillTtlSettings(*tableDesc->MutableTtlSettings()->MutableEnabled(), req.ttl_settings(), code, error)) {
+ issues.AddIssue(NYql::TIssue(error));
+ return false;
+ }
+ }
+ tableDesc->MutableTtlSettings()->SetUseTiering(req.tiering());
+
+ return true;
+ }
+
void SendProposeRequest(const TActorContext &ctx) {
const auto req = GetProtoRequest();
std::pair<TString, TString> pathPair;
@@ -129,6 +169,18 @@ private:
NKikimrTxUserProxy::TEvProposeTransaction& record = proposeRequest->Record;
NKikimrSchemeOp::TModifyScheme* modifyScheme = record.MutableTransaction()->MutableModifyScheme();
modifyScheme->SetWorkingDir(workingDir);
+
+ if (req->store_type() == Ydb::Table::StoreType::STORE_TYPE_COLUMN) {
+ StatusIds::StatusCode code = StatusIds::SUCCESS;
+ NYql::TIssues issues;
+ if (MakeCreateColumnTable(*req, name, *modifyScheme, code, issues)) {
+ ctx.Send(MakeTxProxyID(), proposeRequest.release());
+ } else {
+ Reply(code, issues, ctx);
+ }
+ return;
+ }
+
NKikimrSchemeOp::TTableDescription* tableDesc = nullptr;
if (req->indexesSize()) {
modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpCreateIndexedTable);
diff --git a/ydb/core/ydb_convert/table_description.cpp b/ydb/core/ydb_convert/table_description.cpp
index 6beb381edd..758c271427 100644
--- a/ydb/core/ydb_convert/table_description.cpp
+++ b/ydb/core/ydb_convert/table_description.cpp
@@ -188,6 +188,8 @@ void FillColumnDescription(Ydb::Table::DescribeTableResult& out, const NKikimrSc
out.set_tiering(in.GetTtlSettings().GetUseTiering());
}
}
+
+ out.set_store_type(Ydb::Table::StoreType::STORE_TYPE_COLUMN);
}
bool ExtractColumnTypeInfo(NScheme::TTypeInfo& outTypeInfo, TString& outTypeMod,
@@ -290,6 +292,32 @@ bool FillColumnDescription(NKikimrSchemeOp::TTableDescription& out,
return true;
}
+bool FillColumnDescription(NKikimrSchemeOp::TColumnTableDescription& out,
+ const google::protobuf::RepeatedPtrField<Ydb::Table::ColumnMeta>& in, Ydb::StatusIds::StatusCode& status, TString& error) {
+ auto* schema = out.MutableSchema();
+
+ for (const auto& column : in) {
+ if (column.type().has_pg_type()) {
+ status = Ydb::StatusIds::BAD_REQUEST;
+ error = "Unsupported column type for column: " + column.name();
+ return false;
+ }
+
+ auto* columnDesc = schema->AddColumns();
+ columnDesc->SetName(column.name());
+
+ NScheme::TTypeInfo typeInfo;
+ TString typeMod;
+ if (!ExtractColumnTypeInfo(typeInfo, typeMod, column.type(), status, error)) {
+ return false;
+ }
+ columnDesc->SetType(NScheme::TypeName(typeInfo, typeMod));
+ columnDesc->SetNotNull(column.not_null());
+ }
+
+ return true;
+}
+
template <typename TYdbProto>
void FillTableBoundaryImpl(TYdbProto& out,
const NKikimrSchemeOp::TTableDescription& in, const NKikimrMiniKQL::TType& splitKeyType) {
diff --git a/ydb/core/ydb_convert/table_description.h b/ydb/core/ydb_convert/table_description.h
index 7a7be50aa5..d0e140a5f5 100644
--- a/ydb/core/ydb_convert/table_description.h
+++ b/ydb/core/ydb_convert/table_description.h
@@ -18,6 +18,8 @@ void FillColumnDescription(Ydb::Table::DescribeTableResult& out, const NKikimrSc
// in
bool FillColumnDescription(NKikimrSchemeOp::TTableDescription& out,
const google::protobuf::RepeatedPtrField<Ydb::Table::ColumnMeta>& in, Ydb::StatusIds::StatusCode& status, TString& error);
+bool FillColumnDescription(NKikimrSchemeOp::TColumnTableDescription& out,
+ const google::protobuf::RepeatedPtrField<Ydb::Table::ColumnMeta>& in, Ydb::StatusIds::StatusCode& status, TString& error);
bool ExtractColumnTypeInfo(NScheme::TTypeInfo& outTypeInfo, TString& outTypeMod,
const Ydb::Type& inType, Ydb::StatusIds::StatusCode& status, TString& error);
diff --git a/ydb/public/api/protos/ydb_table.proto b/ydb/public/api/protos/ydb_table.proto
index 83cd3d84eb..ebc8a943d3 100644
--- a/ydb/public/api/protos/ydb_table.proto
+++ b/ydb/public/api/protos/ydb_table.proto
@@ -477,6 +477,12 @@ message ReadReplicasSettings {
reserved 3; // cluster_replicas_settings (part of oneof settings)
}
+enum StoreType {
+ STORE_TYPE_UNSPECIFIED = 0;
+ STORE_TYPE_ROW = 1;
+ STORE_TYPE_COLUMN = 2;
+}
+
message CreateTableRequest {
// Session identifier
string session_id = 1;
@@ -519,6 +525,8 @@ message CreateTableRequest {
string tiering = 18;
// Is temporary table
bool temporary = 19;
+ // Is table column or row oriented
+ StoreType store_type = 20;
}
message CreateTableResponse {
@@ -718,6 +726,8 @@ message DescribeTableResult {
string tiering = 16;
// Is temporary table
bool temporary = 17;
+ // Is table column or row oriented
+ StoreType store_type = 18;
}
message Query {
diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp
index 9af6fc69f2..0e466de52f 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp
@@ -306,6 +306,10 @@ class TTableDescription::TImpl {
Tiering_ = proto.tiering();
}
+ if (proto.store_type()) {
+ StoreType_ = (proto.store_type() == Ydb::Table::STORE_TYPE_COLUMN) ? EStoreType::Column : EStoreType::Row;
+ }
+
// column families
ColumnFamilies_.reserve(proto.column_families_size());
for (const auto& family : proto.column_families()) {
@@ -502,6 +506,10 @@ public:
ReadReplicasSettings_ = TReadReplicasSettings(mode, readReplicasCount);
}
+ void SetStoreType(EStoreType type) {
+ StoreType_ = type;
+ }
+
const TVector<TString>& GetPrimaryKeyColumns() const {
return PrimaryKey_;
}
@@ -526,6 +534,10 @@ public:
return Tiering_;
}
+ EStoreType GetStoreType() const {
+ return StoreType_;
+ }
+
const TString& GetOwner() const {
return Owner_;
}
@@ -619,6 +631,7 @@ private:
TMaybe<TReadReplicasSettings> ReadReplicasSettings_;
bool HasStorageSettings_ = false;
bool HasPartitioningSettings_ = false;
+ EStoreType StoreType_ = EStoreType::Row;
};
TTableDescription::TTableDescription()
@@ -671,6 +684,10 @@ TMaybe<TString> TTableDescription::GetTiering() const {
return Impl_->GetTiering();
}
+EStoreType TTableDescription::GetStoreType() const {
+ return Impl_->GetStoreType();
+}
+
const TString& TTableDescription::GetOwner() const {
return Impl_->GetOwner();
}
@@ -779,6 +796,10 @@ void TTableDescription::SetReadReplicasSettings(TReadReplicasSettings::EMode mod
Impl_->SetReadReplicasSettings(mode, readReplicasCount);
}
+void TTableDescription::SetStoreType(EStoreType type) {
+ Impl_->SetStoreType(type);
+}
+
const TVector<TPartitionStats>& TTableDescription::GetPartitionStats() const {
return Impl_->GetPartitionStats();
}
@@ -858,6 +879,10 @@ void TTableDescription::SerializeTo(Ydb::Table::CreateTableRequest& request) con
request.set_tiering(*tiering);
}
+ if (Impl_->GetStoreType() == EStoreType::Column) {
+ request.set_store_type(Ydb::Table::StoreType::STORE_TYPE_COLUMN);
+ }
+
if (Impl_->HasStorageSettings()) {
request.mutable_storage_settings()->CopyFrom(Impl_->GetStorageSettings().GetProto());
}
@@ -1036,6 +1061,11 @@ TColumnFamilyDescription TColumnFamilyBuilder::Build() const {
////////////////////////////////////////////////////////////////////////////////
+TTableBuilder& TTableBuilder::SetStoreType(EStoreType type) {
+ TableDescription_.SetStoreType(type);
+ return *this;
+}
+
TTableBuilder& TTableBuilder::AddNullableColumn(const TString& name, const EPrimitiveType& type, const TString& family) {
auto columnType = TTypeBuilder()
.BeginOptional()
diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.h b/ydb/public/sdk/cpp/client/ydb_table/table.h
index 629ed20193..4892a3a32b 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/table.h
+++ b/ydb/public/sdk/cpp/client/ydb_table/table.h
@@ -455,6 +455,11 @@ private:
struct TExplicitPartitions;
struct TDescribeTableSettings;
+enum class EStoreType {
+ Row = 0,
+ Column = 1
+};
+
//! Represents table description
class TTableDescription {
friend class TTableBuilder;
@@ -473,6 +478,7 @@ public:
TVector<TChangefeedDescription> GetChangefeedDescriptions() const;
TMaybe<TTtlSettings> GetTtlSettings() const;
TMaybe<TString> GetTiering() const;
+ EStoreType GetStoreType() const;
// Deprecated. Use GetEntry() of TDescribeTableResult instead
const TString& GetOwner() const;
@@ -554,6 +560,7 @@ private:
void SetPartitioningSettings(const TPartitioningSettings& settings);
void SetKeyBloomFilter(bool enabled);
void SetReadReplicasSettings(TReadReplicasSettings::EMode mode, ui64 readReplicasCount);
+ void SetStoreType(EStoreType type);
const Ydb::Table::DescribeTableResult& GetProto() const;
class TImpl;
@@ -723,6 +730,8 @@ class TTableBuilder {
public:
TTableBuilder() = default;
+ TTableBuilder& SetStoreType(EStoreType type);
+
TTableBuilder& AddNullableColumn(const TString& name, const EPrimitiveType& type, const TString& family = TString());
TTableBuilder& AddNullableColumn(const TString& name, const TDecimalType& type, const TString& family = TString());
TTableBuilder& AddNullableColumn(const TString& name, const TPgType& type, const TString& family = TString());
diff --git a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp
index 4fe5b2e040..10867cf5ec 100644
--- a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp
+++ b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp
@@ -263,7 +263,7 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) {
}
}
- Y_UNIT_TEST(ParquetImportBug) {
+ void ParquetImportBug(bool columnTable) {
NKikimrConfig::TAppConfig appConfig;
TKikimrWithGrpcAndRootSchema server(appConfig);
server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG);
@@ -287,6 +287,8 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) {
};
auto tableBuilder = client.GetTableBuilder();
+ auto tableType = columnTable ? NYdb::NTable::EStoreType::Column : NYdb::NTable::EStoreType::Row;
+ tableBuilder.SetStoreType(tableType);
for (auto& [name, type] : schema) {
if (name == "id") {
tableBuilder.AddNonNullableColumn(name, type);
@@ -295,11 +297,25 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) {
}
}
tableBuilder.SetPrimaryKeyColumns({"id"});
+ if (columnTable) {
+ NYdb::NTable::TTablePartitioningSettingsBuilder partsBuilder(tableBuilder);
+ partsBuilder.SetMinPartitionsCount(1);
+ partsBuilder.EndPartitioningSettings();
+ }
auto result = session.CreateTable(tablePath, tableBuilder.Build(), {}).ExtractValueSync();
UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+ {
+ auto descResult = session.DescribeTable(tablePath, {}).ExtractValueSync();
+ UNIT_ASSERT_EQUAL(descResult.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(descResult.GetStatus(), EStatus::SUCCESS);
+
+ Cerr << "Table type: " << (ui32) descResult.GetTableDescription().GetStoreType() << Endl;
+ UNIT_ASSERT_EQUAL(descResult.GetTableDescription().GetStoreType(), tableType);
+ }
+
auto batchSchema = std::make_shared<arrow::Schema>(
std::vector<std::shared_ptr<arrow::Field>>{
arrow::field("id", arrow::uint32()),
@@ -343,6 +359,14 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) {
UNIT_ASSERT_GT(rows.size(), 0);
}
}
+#if 0 // TODO: KIKIMR-18717
+ Y_UNIT_TEST(ParquetImportBug) {
+ ParquetImportBug(true);
+ }
+#endif
+ Y_UNIT_TEST(ParquetImportBug_Datashard) {
+ ParquetImportBug(false);
+ }
Y_UNIT_TEST(UpsertCsvBug) {
NKikimrConfig::TAppConfig appConfig;
@@ -687,9 +711,7 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) {
tableBuilder.AddNullableColumn(name, type);
}
tableBuilder.SetPrimaryKeyColumns({"timestamp"});
- NYdb::NTable::TCreateTableSettings tableSettings;
- //tableSettings.PartitioningPolicy(NYdb::NTable::TPartitioningPolicy().UniformPartitions(2));
- auto result = session.CreateTable(tablePath, tableBuilder.Build(), tableSettings).ExtractValueSync();
+ auto result = session.CreateTable(tablePath, tableBuilder.Build(), {}).ExtractValueSync();
UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SUCCESS);
@@ -796,9 +818,7 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) {
tableBuilder.AddNullableColumn(name, type);
}
tableBuilder.SetPrimaryKeyColumns({"timestamp"});
- NYdb::NTable::TCreateTableSettings tableSettings;
- //tableSettings.PartitioningPolicy(NYdb::NTable::TPartitioningPolicy().UniformPartitions(2));
- auto result = session.CreateTable(tablePath, tableBuilder.Build(), tableSettings).ExtractValueSync();
+ auto result = session.CreateTable(tablePath, tableBuilder.Build(), {}).ExtractValueSync();
UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SUCCESS);