summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPisarenko Grigoriy <[email protected]>2025-07-14 17:50:32 +0500
committerGitHub <[email protected]>2025-07-14 15:50:32 +0300
commitcad7475fab61c915093e170985ae563260b4570f (patch)
treee60b88a740d3ab2dd0d92f9cea05db937439759c
parentd4276d05018609380a67dc6c9de6b44b90d2d942 (diff)
YQ-4418 improved s3 read / write partitions validation (#20081)
Co-authored-by: Copilot <[email protected]> Co-authored-by: Ivan Sukhov <[email protected]>
-rw-r--r--ydb/core/external_sources/object_storage.cpp16
-rw-r--r--ydb/core/external_sources/object_storage_ut.cpp20
-rw-r--r--ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp76
-rw-r--r--ydb/library/yql/providers/s3/common/util.cpp10
-rw-r--r--ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp127
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp7
-rw-r--r--ydb/tests/fq/s3/test_insert.py2
7 files changed, 186 insertions, 72 deletions
diff --git a/ydb/core/external_sources/object_storage.cpp b/ydb/core/external_sources/object_storage.cpp
index e93ae9c0577..2c2de549d0a 100644
--- a/ydb/core/external_sources/object_storage.cpp
+++ b/ydb/core/external_sources/object_storage.cpp
@@ -63,9 +63,19 @@ struct TObjectStorageExternalSource : public IExternalSource {
} else if (key.StartsWith("projection.") || key == "storage.location.template") {
objectStorage.mutable_projection()->insert({key, value});
} else if (lowerKey == "partitioned_by") {
- auto json = NSc::TValue::FromJsonThrow(value);
- for (const auto& column: json.GetArray()) {
- *objectStorage.add_partitioned_by() = column;
+ try {
+ const auto json = NSc::TValue::FromJsonThrow(value);
+ if (!json.IsArray()) {
+ throw TExternalSourceException() << "partitioned_by must be an array of column names";
+ }
+ for (const auto& column: json.GetArray()) {
+ if (!column.IsString()) {
+ throw TExternalSourceException() << "partitioned_by must be an array of strings";
+ }
+ *objectStorage.add_partitioned_by() = column;
+ }
+ } catch (const std::exception& e) {
+ throw TExternalSourceException() << "Failed to parse partitioned_by: " << e.what();
}
} else if (IsIn({"file_pattern"sv, "data.interval.unit"sv, "data.datetime.format_name"sv, "data.datetime.format"sv, "data.timestamp.format_name"sv, "data.timestamp.format"sv, "data.date.format"sv, "csv_delimiter"sv}, lowerKey)) {
objectStorage.mutable_format_setting()->insert({lowerKey, value});
diff --git a/ydb/core/external_sources/object_storage_ut.cpp b/ydb/core/external_sources/object_storage_ut.cpp
index 129ad8febd7..f6bc281efae 100644
--- a/ydb/core/external_sources/object_storage_ut.cpp
+++ b/ydb/core/external_sources/object_storage_ut.cpp
@@ -89,6 +89,26 @@ Y_UNIT_TEST_SUITE(ObjectStorageTest) {
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Location '*' contains wildcards");
}
}
+
+ Y_UNIT_TEST(FailedPartitionedByValidation) {
+ const auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
+ NKikimrExternalSources::TSchema schema;
+ {
+ NKikimrExternalSources::TGeneral general;
+ general.mutable_attributes()->emplace("partitioned_by", "{\"year\": \"2025\"}");
+ UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "partitioned_by must be an array of column names");
+ }
+ {
+ NKikimrExternalSources::TGeneral general;
+ general.mutable_attributes()->emplace("partitioned_by", "[{\"year\": \"2025\"}]");
+ UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "partitioned_by must be an array of strings");
+ }
+ {
+ NKikimrExternalSources::TGeneral general;
+ general.mutable_attributes()->emplace("partitioned_by", "[{");
+ UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Failed to parse partitioned_by:");
+ }
+ }
}
} // NKikimr
diff --git a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
index 388bcf004b9..7326646ce91 100644
--- a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
+++ b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
@@ -2697,6 +2697,82 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
UNIT_ASSERT(parser.TryNextRow());
UNIT_ASSERT(!parser.ColumnParser(0).GetOptionalString());
}
+
+ Y_UNIT_TEST(TestRawFormatInsertValidation) {
+ const TString bucket = "test_raw_format_insert_validation_bucket";
+ CreateBucket(bucket);
+
+ auto kikimr = NTestUtils::MakeKikimrRunner();
+
+ auto tc = kikimr->GetTableClient();
+ auto session = tc.CreateSession().GetValueSync().GetSession();
+ {
+ const TString query = fmt::format(R"(
+ CREATE EXTERNAL DATA SOURCE `test_bucket` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="{location}",
+ AUTH_METHOD="NONE"
+ );)",
+ "location"_a = GetBucketLocation(bucket)
+ );
+ const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ auto db = kikimr->GetQueryClient();
+ const TString query = R"(
+ INSERT INTO test_bucket.`/result/` WITH (
+ FORMAT = "raw",
+ SCHEMA (
+ data String??
+ )
+ )
+ (data)
+ VALUES
+ ("some_string")
+ )";
+ const auto result = db.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync();
+ const auto& issues = result.GetIssues().ToString();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::GENERIC_ERROR, issues);
+ UNIT_ASSERT_STRING_CONTAINS(issues, "Only a column with a primitive type is allowed for the raw format");
+ }
+
+ Y_UNIT_TEST(TestPartitionedByInsertValidation) {
+ const TString bucket = "test_partitioned_by_insert_validation_bucket";
+ CreateBucket(bucket);
+
+ auto kikimr = NTestUtils::MakeKikimrRunner();
+
+ auto tc = kikimr->GetTableClient();
+ auto session = tc.CreateSession().GetValueSync().GetSession();
+ {
+ const TString query = fmt::format(R"(
+ CREATE EXTERNAL DATA SOURCE `test_bucket` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="{location}",
+ AUTH_METHOD="NONE"
+ );)",
+ "location"_a = GetBucketLocation(bucket)
+ );
+ const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ auto db = kikimr->GetQueryClient();
+ const TString query = R"(
+ INSERT INTO test_bucket.`/result/` WITH (
+ FORMAT = "csv_with_names",
+ PARTITIONED_BY = (data)
+ )
+ (data)
+ VALUES
+ ("some_string")
+ )";
+ const auto result = db.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync();
+ const auto& issues = result.GetIssues().ToString();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::GENERIC_ERROR, issues);
+ UNIT_ASSERT_STRING_CONTAINS(issues, "Write schema contains no columns except partitioning columns.");
+ }
}
} // namespace NKikimr::NKqp
diff --git a/ydb/library/yql/providers/s3/common/util.cpp b/ydb/library/yql/providers/s3/common/util.cpp
index 57b7c81ccbf..89346176e61 100644
--- a/ydb/library/yql/providers/s3/common/util.cpp
+++ b/ydb/library/yql/providers/s3/common/util.cpp
@@ -218,9 +218,13 @@ bool ValidateS3WriteSchema(TPositionHandle pos, std::string_view format, const T
return false;
}
- const TDataExprType* rowType;
- bool isOptional;
- return EnsureDataOrOptionalOfData(pos, schemaStructRowType->GetItems().front()->GetItemType(), isOptional, rowType, ctx);
+ const auto* rowType = schemaStructRowType->GetItems().front()->GetItemType();
+ if (rowType->GetKind() != ETypeAnnotationKind::Data) {
+ ctx.AddError(TIssue(ctx.GetPosition(pos), TStringBuilder() << "Only a column with a primitive type is allowed for the raw format (you have field with type " << *rowType << ")"));
+ return false;
+ }
+
+ return true;
}
if (format == "json_list"sv) {
diff --git a/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp b/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp
index e9b9e9aeab5..5921c360a46 100644
--- a/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp
+++ b/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp
@@ -29,7 +29,7 @@ TInstant Strptime(const TString& input, const TString& format) {
return TInstant::Seconds(seconds);
}
}
- ythrow yexception() << "Can't parse date " << input << " in format " << format;
+ throw yexception() << "Can't parse date " << input << " in format " << format;
}
TString Strftime(const char* format, TInstant time) {
@@ -42,7 +42,7 @@ TString Strftime(const char* format, TInstant time) {
if (r != 0) {
return TString(buf.Data(), r);
}
- ythrow yexception() << "Can't format date " << time << " in format " << format;
+ throw yexception() << "Can't format date " << time << " in format " << format;
}
@@ -84,19 +84,19 @@ i64 GetIntOrThrow(const NSc::TValue& json, const TString& error) {
}
}
- ythrow yexception() << error;
+ throw yexception() << error;
}
TString GetStringOrThrow(const NSc::TValue& json, const TString& error) {
if (json.IsString() || json.IsIntNumber() || json.IsNumber()) {
return json.ForceString();
}
- ythrow yexception() << error;
+ throw yexception() << error;
}
i64 GetBoolOrThrow(const NSc::TValue& json, const TString& error) {
if (json.IsNull()) {
- ythrow yexception() << error;
+ throw yexception() << error;
}
return json.IsTrue();
}
@@ -108,7 +108,7 @@ IPathGenerator::EIntervalUnit ToIntervalUnit(const TString& unit) {
return name.first;
}
}
- ythrow yexception() << "Invalid projection scheme: unit " << unit << " must be one of " << GetEnumAllNames<IPathGenerator::EIntervalUnit>();
+ throw yexception() << "Invalid projection scheme: unit " << unit << " must be one of " << GetEnumAllNames<IPathGenerator::EIntervalUnit>();
}
TMap<IPathGenerator::EType, TString> ToLowerType() {
@@ -126,13 +126,13 @@ IPathGenerator::EType ToType(const TString& type) {
return name.first;
}
}
- ythrow yexception() << "Invalid projection scheme: type " << type << " must be one of " << to_lower(GetEnumAllNames<IPathGenerator::EType>());
+ throw yexception() << "Invalid projection scheme: type " << type << " must be one of " << to_lower(GetEnumAllNames<IPathGenerator::EType>());
}
std::string fmtInteger(int32_t width, i64 value)
{
if (width > 64) {
- ythrow yexception() << "Digits cannot exceed 64, but received " << width;
+ throw yexception() << "Digits cannot exceed 64, but received " << width;
}
if (width == 0) {
return std::to_string(value);
@@ -188,7 +188,7 @@ TDuration FromUnit(int64_t interval, IPathGenerator::EIntervalUnit unit) {
case IPathGenerator::EIntervalUnit::MONTHS: // external special handling
case IPathGenerator::EIntervalUnit::YEARS:
default:
- ythrow yexception() << "Only the " << GetEnumAllNames<IPathGenerator::EIntervalUnit>() << " units are supported but got " << unit;
+ throw yexception() << "Only the " << GetEnumAllNames<IPathGenerator::EIntervalUnit>() << " units are supported but got " << unit;
}
}
@@ -219,12 +219,12 @@ TInstant AddUnit(TInstant current, int64_t interval, IPathGenerator::EIntervalUn
const TDuration delta = FromUnit(abs(interval), unit);
if (delta.GetValue() > std::numeric_limits<i64>::max()) {
- ythrow yexception() << "Interval is overflowed";
+ throw yexception() << "Interval is overflowed";
}
const i64 deltaValue = (interval > 0 ? 1LL : -1LL) * delta.GetValue();
if (IsOverflow(current.GetValue(), deltaValue)) {
- ythrow yexception() << "Timestamp is overflowed";
+ throw yexception() << "Timestamp is overflowed";
}
return interval > 0 ? current + delta : current - delta;
@@ -283,23 +283,23 @@ public:
TString Format(const TStringBuf& columnName, const TStringBuf& dataValue) const override {
auto it = ColumnConfig.find(columnName);
if (it == ColumnConfig.end()) {
- ythrow yexception() << columnName << " column not found in config";
+ throw yexception() << columnName << " column not found in config";
}
const auto& config = it->second;
switch (config.Type) {
case IPathGenerator::EType::UNDEFINED: {
- ythrow yexception() << columnName << " column has undefined type";
+ throw yexception() << columnName << " column has undefined type";
}
case IPathGenerator::EType::ENUM: {
if (Find(config.Values, dataValue) == config.Values.end()) {
- ythrow yexception() << dataValue << " data not found as enum item";
+ throw yexception() << dataValue << " data not found as enum item";
}
return TString{dataValue};
}
case IPathGenerator::EType::INTEGER: {
i64 value = 0;
if (!TryFromString(dataValue.data(), dataValue.size(), value)) {
- ythrow yexception() << dataValue << " data is not a int64";
+ throw yexception() << dataValue << " data is not a int64";
}
return fmtInteger(config.Digits, value);
}
@@ -314,23 +314,23 @@ public:
TString Parse(const TStringBuf& columnName, const TStringBuf& pathValue) const override {
auto it = ColumnConfig.find(columnName);
if (it == ColumnConfig.end()) {
- ythrow yexception() << columnName << " column not found in config";
+ throw yexception() << columnName << " column not found in config";
}
const auto& config = it->second;
switch (config.Type) {
case IPathGenerator::EType::UNDEFINED: {
- ythrow yexception() << columnName << " column has undefined type";
+ throw yexception() << columnName << " column has undefined type";
}
case IPathGenerator::EType::ENUM: {
if (Find(config.Values, pathValue) == config.Values.end()) {
- ythrow yexception() << pathValue << " value not found as enum item";
+ throw yexception() << pathValue << " value not found as enum item";
}
return TString{pathValue};
}
case IPathGenerator::EType::INTEGER: {
int64_t value = 0;
if (!TryFromString(pathValue.data(), pathValue.size(), value)) {
- ythrow yexception() << pathValue << " value is not a int64";
+ throw yexception() << pathValue << " value is not a int64";
}
return std::to_string(value);
}
@@ -354,7 +354,7 @@ private:
// Parse
void ParsePartitioningRules(const TString& config, const std::vector<TString>& partitionedBy) {
if (partitionedBy.empty()) {
- ythrow yexception() << "Partition by must always be specified";
+ throw yexception() << "Partition by must always be specified with projection";
}
if (!config) {
@@ -368,14 +368,14 @@ private:
NSc::TValue json = NSc::TValue::FromJsonThrow(config, NSc::TJsonOpts::JO_PARSER_DISALLOW_DUPLICATE_KEYS | NSc::TJsonOpts::JO_SORT_KEYS);
if (!json.IsDict()) {
- ythrow yexception() << "Invalid projection scheme: top-level element must be a dictionary";
+ throw yexception() << "Invalid projection scheme: top-level element must be a dictionary";
}
TMap<TString, TMap<TString, NSc::TValue>> projection;
- for (const auto& p: json.GetDict()) {
+ for (const auto& p : json.GetDict()) {
const auto path = GetPath(p.first);
if (path.empty()) {
- ythrow yexception() << "Invalid key: key should start with storage or projection, but got an empty value";
+ throw yexception() << "Invalid key: key should start with storage or projection, but got an empty value";
}
const TString& kind = path.front();
if (kind == "projection") {
@@ -383,23 +383,23 @@ private:
} else if (kind == "storage") {
AddStorage(p.first, p.second, path);
} else {
- ythrow yexception() << "Invalid key: key should start with storage or projection, but got " << p.first;
+ throw yexception() << "Invalid key: key should start with storage or projection, but got " << p.first;
}
}
DoParseProjection(projection);
DoValidateTemplate(partitionedBy);
- for (const auto& config: Config.Rules) {
+ for (const auto& config : Config.Rules) {
ColumnConfig[config.Name] = config;
}
}
void AddProjection(const TStringBuf& key, const NSc::TValue& json, const TVector<TString>& path, TMap<TString, TMap<TString, NSc::TValue>>& projection) {
if (path.size() != 3 && path.size() != 2) {
- ythrow yexception() << "The key must be three-component or two-component, but received " << key;
+ throw yexception() << "The key for 'projection' must be three-component or two-component, but received " << key;
}
if (path.size() == 2 && path[1] != "enabled") {
- ythrow yexception() << "Unknown key " << key;
+ throw yexception() << "Unknown key " << key;
}
if (path.size() == 2) {
@@ -412,11 +412,11 @@ private:
void AddStorage(const TStringBuf& key, const NSc::TValue& json, const TVector<TString>& path) {
if (path.size() != 3) {
- ythrow yexception() << "The key must be three-component, but received " << key;
+ throw yexception() << "The key for 'storage' must be three-component, but received " << key;
}
if (path[1] != "location" || path[2] != "template") {
- ythrow yexception() << "The key must be storage.location.template, but received " << key;
+ throw yexception() << "The key for 'storage' must be storage.location.template, but received " << key;
}
TString locationTemplate = GetStringOrThrow(json, "The storage.location.template must be a string");
@@ -425,13 +425,12 @@ private:
Config.LocationTemplate = TStringBuilder() << locationTemplate << '/';
}
-
void DoParseEnumType(const TString& columnName, const TString& type, const TMap<TString, NSc::TValue>& projection) {
if (!projection.contains("values")) {
- ythrow yexception() << "Invalid projection scheme: values are required field for " << columnName << " " << type;
+ throw yexception() << "Invalid projection scheme: values are required field for " << columnName << " " << type;
}
if (!projection.contains("type")) {
- ythrow yexception() << "Invalid projection scheme: type are required field for " << columnName << " " << type;
+ throw yexception() << "Invalid projection scheme: type is a required field for " << columnName << " " << type;
}
for (const auto& p: projection) {
if (p.first == "type") {
@@ -444,37 +443,37 @@ private:
}
Config.Rules.push_back(IPathGenerator::TColumnPartitioningConfig{.Type=ToType(type), .Name=columnName, .Values=std::move(values)});
} else {
- ythrow yexception() << "Invalid projection scheme: enum element must include only type or values (as string) but got " << p.first;
+ throw yexception() << "Invalid projection scheme: enum element must include only type or values (as string) but got " << p.first;
}
}
}
void DoParseIntegerType(const TString& columnName, const TString& type, const TMap<TString, NSc::TValue>& projection) {
if (!projection.contains("type")) {
- ythrow yexception() << "Invalid projection scheme: type are required field for " << columnName << " " << type;
+ throw yexception() << "Invalid projection scheme: type are required field for " << columnName << " " << type;
}
if (!projection.contains("min")) {
- ythrow yexception() << "Invalid projection scheme: min are required field for " << columnName << " " << type;
+ throw yexception() << "Invalid projection scheme: min are required field for " << columnName << " " << type;
}
if (!projection.contains("max")) {
- ythrow yexception() << "Invalid projection scheme: max are required field for " << columnName << " " << type;
+ throw yexception() << "Invalid projection scheme: max are required field for " << columnName << " " << type;
}
IPathGenerator::TColumnPartitioningConfig columnConfig;
columnConfig.Name = columnName;
columnConfig.Type = ToType(type);
- for (const auto& p: projection) {
+ for (const auto& p : projection) {
if (p.first == "type") {
// already processed
} else if (p.first == "min") {
- columnConfig.Min = GetIntOrThrow(p.second, "The min must be a number");
+ columnConfig.Min = GetIntOrThrow(p.second, "The min must be a number for " + type + " field");
} else if (p.first == "max") {
- columnConfig.Max = GetIntOrThrow(p.second, "The max must be a number");
+ columnConfig.Max = GetIntOrThrow(p.second, "The max must be a number for " + type + " field");
} else if (p.first == "interval") {
columnConfig.Interval = GetIntOrThrow(p.second, "The interval must be a number");
} else if (p.first == "digits") {
columnConfig.Digits = GetIntOrThrow(p.second, "The digits must be a number");
} else {
- ythrow yexception() << "Invalid projection scheme: integer element must include only type, min, max, interval, digits but got " << p.first;
+ throw yexception() << "Invalid projection scheme: integer element must include only type, min, max, interval, digits but got " << p.first;
}
}
Config.Rules.push_back(columnConfig);
@@ -482,16 +481,16 @@ private:
void DoParseDateType(const TString& columnName, const TString& type, const TMap<TString, NSc::TValue>& projection) {
if (!projection.contains("type")) {
- ythrow yexception() << "Invalid projection scheme: type are required field for " << columnName << " " << type;
+ throw yexception() << "Invalid projection scheme: type are required field for " << columnName << " " << type;
}
if (!projection.contains("min")) {
- ythrow yexception() << "Invalid projection scheme: min are required field for " << columnName << " " << type;
+ throw yexception() << "Invalid projection scheme: min are required field for " << columnName << " " << type;
}
if (!projection.contains("max")) {
- ythrow yexception() << "Invalid projection scheme: max are required field for " << columnName << " " << type;
+ throw yexception() << "Invalid projection scheme: max are required field for " << columnName << " " << type;
}
if (!projection.contains("format")) {
- ythrow yexception() << "Invalid projection scheme: format are required field for " << columnName << " " << type;
+ throw yexception() << "Invalid projection scheme: format are required field for " << columnName << " " << type;
}
IPathGenerator::TColumnPartitioningConfig columnConfig;
columnConfig.Name = columnName;
@@ -500,9 +499,9 @@ private:
if (p.first == "type") {
// already processed
} else if (p.first == "min") {
- columnConfig.From = GetStringOrThrow(p.second, "The min must be a string");
+ columnConfig.From = GetStringOrThrow(p.second, "The min must be a string for " + type + " field");
} else if (p.first == "max") {
- columnConfig.To = GetStringOrThrow(p.second, "The max must be a string");
+ columnConfig.To = GetStringOrThrow(p.second, "The max must be a string for " + type + " field");
} else if (p.first == "format") {
columnConfig.Format = GetStringOrThrow(p.second, "The format must be a string");
} else if (p.first == "interval") {
@@ -510,7 +509,7 @@ private:
} else if (p.first == "unit") {
columnConfig.IntervalUnit = ToIntervalUnit(GetStringOrThrow(p.second, "The unit must be a string"));
} else {
- ythrow yexception() << "Invalid projection scheme: date element must include only type, min, max, format, interval, unit but got " << p.first;
+ throw yexception() << "Invalid projection scheme: date element must include only type, min, max, format, interval, unit but got " << p.first;
}
}
Config.Rules.push_back(columnConfig);
@@ -519,7 +518,7 @@ private:
void DoParseColumn(const TString& columnName, const TMap<TString, NSc::TValue>& projection) {
auto it = projection.find("type");
if (it == projection.end()) {
- ythrow yexception() << "Invalid projection scheme: type element must exist for the column" << columnName;
+ throw yexception() << "Invalid projection scheme: type element must exist for the column " << columnName;
}
const TString type = GetStringOrThrow(it->second, "The type must be a string for column " + columnName);
@@ -530,12 +529,12 @@ private:
} else if (type == "date") {
DoParseDateType(columnName, type, projection);
} else {
- ythrow yexception() << "Invalid projection scheme: type element should be one of enum, date, integer but got " << type;
+ throw yexception() << "Invalid projection scheme: type element should be one of enum, date, integer but got " << type;
}
}
void DoParseProjection(TMap<TString, TMap<TString, NSc::TValue>> projection) {
- for (const auto& p: projection) {
+ for (const auto& p : projection) {
DoParseColumn(p.first, p.second);
}
}
@@ -550,27 +549,27 @@ private:
}
TSet<TString> partitionedByColumns;
- for (const auto& columnName: partitionedBy) {
+ for (const auto& columnName : partitionedBy) {
partitionedByColumns.insert(columnName);
if (!vars.contains(columnName)) {
- ythrow yexception() << "Template " << Config.LocationTemplate << " must include ${" << columnName << "}";
+ throw yexception() << "Template " << Config.LocationTemplate << " must include ${" << columnName << "} from partitioned_by columns set";
}
}
TSet<TString> rulesColumns;
- for (const auto& rule: Config.Rules) {
+ for (const auto& rule : Config.Rules) {
rulesColumns.insert(rule.Name);
if (!vars.contains(rule.Name)) {
- ythrow yexception() << "Template " << Config.LocationTemplate << " must include ${" << rule.Name << "}";
+ throw yexception() << "Template " << Config.LocationTemplate << " must include ${" << rule.Name << "} from projection columns set";
}
}
- for (const auto& var: vars) {
+ for (const auto& var : vars) {
if (!partitionedByColumns.contains(var)) {
- ythrow yexception() << "Partitioned by column named " << var << " does not exist for template " << Config.LocationTemplate;
+ throw yexception() << "Partitioned by column named " << var << " does not exist for template " << Config.LocationTemplate;
}
if (!rulesColumns.contains(var)) {
- ythrow yexception() << "Projection column named " << var << " does not exist for template " << Config.LocationTemplate;
+ throw yexception() << "Projection column named " << var << " does not exist for template " << Config.LocationTemplate;
}
}
}
@@ -592,7 +591,7 @@ private:
}
if (Rules.empty()) {
- ythrow yexception() << "The projection contains an empty set of paths";
+ throw yexception() << "The projection contains an empty set of paths";
}
}
@@ -631,12 +630,12 @@ private:
size_t p = 0) {
if (rules.size() == p) {
if (result.size() == pathsLimit) {
- ythrow yexception() << "The limit on the number of paths has been reached: " << result.size() << " of " << pathsLimit;
+ throw yexception() << "The limit on the number of paths has been reached: " << result.size() << " of " << pathsLimit;
}
auto pib = result.emplace(locationTemplate, columnsWithValue);
if (!pib.second) {
- ythrow yexception() << "Location path " << locationTemplate << " is composed by different projection value sets " << FormatColumnValues(pib.first->second) << " and " << FormatColumnValues(columnsWithValue);
+ throw yexception() << "Location path " << locationTemplate << " is composed by different projection value sets " << FormatColumnValues(pib.first->second) << " and " << FormatColumnValues(columnsWithValue);
}
return;
}
@@ -653,7 +652,7 @@ private:
DoGenerateDate(rules, locationTemplate, columnsWithValue, result, pathsLimit, now, p);
break;
default:
- ythrow yexception() << "Only the enum, integer, date types are supported but got " << to_lower(ToString(rule.Type));
+ throw yexception() << "Only the enum, integer, date types are supported but got " << to_lower(ToString(rule.Type));
}
}
@@ -731,21 +730,21 @@ private:
if (std::numeric_limits<int32_t>::min() <= value && value <= std::numeric_limits<int32_t>::max()) {
return;
}
- ythrow yexception() << "The value " << value << " is not representable as an int32 type for column " << column;
+ throw yexception() << "The value " << value << " is not representable as an int32 type for column " << column;
}
static void CheckCastUint32(int64_t value, const TString& column) {
if (value >= 0 && value <= std::numeric_limits<uint32_t>::max()) {
return;
}
- ythrow yexception() << "The value " << value << " is not representable as an uint32 type for column " << column;
+ throw yexception() << "The value " << value << " is not representable as an uint32 type for column " << column;
}
static void CheckCastUint64(int64_t value, const TString& column) {
if (value >= 0) {
return;
}
- ythrow yexception() << "The value " << value << " is not representable as an uint64 type for column " << column;
+ throw yexception() << "The value " << value << " is not representable as an uint64 type for column " << column;
}
};
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
index 9748f0c7717..0ec8cd04756 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
@@ -438,11 +438,16 @@ private:
return nullptr;
}
} else {
- ctx.AddError(TIssue(ctx.GetPosition(key->Pos()), "Missed key column."));
+ ctx.AddError(TIssue(ctx.GetPosition(key->Pos()), TStringBuilder() << "Missing key column for partitioning: '" << key->Content() << "'. Please ensure the column is included in the schema."));
return nullptr;
}
}
+ if (structType->GetSize() <= keysCount) {
+ ctx.AddError(TIssue(ctx.GetPosition(format.Pos()), TStringBuilder() << "Write schema contains no columns except partitioning columns."));
+ return nullptr;
+ }
+
TTypeAnnotationNode::TListType itemTypes(keysCount + 1U, ctx.MakeType<TDataExprType>(EDataSlot::Utf8));
itemTypes.front() = ctx.MakeType<TOptionalExprType>(ctx.MakeType<TDataExprType>(EDataSlot::String));
diff --git a/ydb/tests/fq/s3/test_insert.py b/ydb/tests/fq/s3/test_insert.py
index 2e43babe533..2033c6e797b 100644
--- a/ydb/tests/fq/s3/test_insert.py
+++ b/ydb/tests/fq/s3/test_insert.py
@@ -586,7 +586,7 @@ class TestS3(object):
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.FAILED)
issues = str(client.describe_query(query_id).result.query.issue)
- assert "Expected data or optional of data" in issues, "Incorrect Issues: " + issues
+ assert "Only a column with a primitive type is allowed for the raw format" in issues, "Incorrect Issues: " + issues
def get_insert_test_query(self, insert_path: str):
sql = f"INSERT INTO {insert_path} WITH (FORMAT = \"parquet\") SELECT\n"