diff options
author | ivanmorozov <[email protected]> | 2023-05-05 21:03:22 +0300 |
---|---|---|
committer | ivanmorozov <[email protected]> | 2023-05-05 21:03:22 +0300 |
commit | 3b5a0620ae4bf7b16da5934b17f2ae4b6851865c (patch) | |
tree | fcc901dfbee6070ce820843d3f64f7a94e493bc0 | |
parent | cffea5310713f2f5cfc5bd130e90aa49efc6d6ee (diff) |
ridetech logs processing, external tiers info, etc
-rw-r--r-- | ydb/core/kqp/common/kqp_resolve.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/sharding/CMakeLists.darwin-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/tx/sharding/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/core/tx/sharding/CMakeLists.linux-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/tx/sharding/CMakeLists.windows-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/tx/sharding/sharding.cpp | 49 | ||||
-rw-r--r-- | ydb/core/tx/sharding/sharding.h | 49 | ||||
-rw-r--r-- | ydb/core/tx/sharding/unboxed_reader.cpp | 50 | ||||
-rw-r--r-- | ydb/core/tx/sharding/unboxed_reader.h | 49 |
9 files changed, 115 insertions, 87 deletions
diff --git a/ydb/core/kqp/common/kqp_resolve.h b/ydb/core/kqp/common/kqp_resolve.h index ffc5eb7f577..6341284bfdb 100644 --- a/ydb/core/kqp/common/kqp_resolve.h +++ b/ydb/core/kqp/common/kqp_resolve.h @@ -2,6 +2,7 @@ #include <ydb/core/engine/mkql_keys.h> #include <ydb/core/tx/sharding/sharding.h> +#include <ydb/core/tx/sharding/unboxed_reader.h> #include <ydb/core/kqp/expr_nodes/kqp_expr_nodes.h> #include <ydb/core/protos/kqp_physical.pb.h> #include <ydb/core/scheme/scheme_tabledefs.h> diff --git a/ydb/core/tx/sharding/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/sharding/CMakeLists.darwin-x86_64.txt index cab50904fd5..08200fb7797 100644 --- a/ydb/core/tx/sharding/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/sharding/CMakeLists.darwin-x86_64.txt @@ -25,4 +25,5 @@ target_sources(core-tx-sharding PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/sharding.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/hash.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/xx_hash.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/unboxed_reader.cpp ) diff --git a/ydb/core/tx/sharding/CMakeLists.linux-aarch64.txt b/ydb/core/tx/sharding/CMakeLists.linux-aarch64.txt index 5e702794935..d0d22a1949a 100644 --- a/ydb/core/tx/sharding/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/sharding/CMakeLists.linux-aarch64.txt @@ -26,4 +26,5 @@ target_sources(core-tx-sharding PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/sharding.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/hash.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/xx_hash.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/unboxed_reader.cpp ) diff --git a/ydb/core/tx/sharding/CMakeLists.linux-x86_64.txt b/ydb/core/tx/sharding/CMakeLists.linux-x86_64.txt index 5e702794935..d0d22a1949a 100644 --- a/ydb/core/tx/sharding/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/sharding/CMakeLists.linux-x86_64.txt @@ -26,4 +26,5 @@ target_sources(core-tx-sharding PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/sharding.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/hash.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/xx_hash.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/unboxed_reader.cpp ) diff --git a/ydb/core/tx/sharding/CMakeLists.windows-x86_64.txt b/ydb/core/tx/sharding/CMakeLists.windows-x86_64.txt index cab50904fd5..08200fb7797 100644 --- a/ydb/core/tx/sharding/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/sharding/CMakeLists.windows-x86_64.txt @@ -25,4 +25,5 @@ target_sources(core-tx-sharding PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/sharding.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/hash.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/xx_hash.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/unboxed_reader.cpp ) diff --git a/ydb/core/tx/sharding/sharding.cpp b/ydb/core/tx/sharding/sharding.cpp index 4e2432a0d04..7f70bfd0234 100644 --- a/ydb/core/tx/sharding/sharding.cpp +++ b/ydb/core/tx/sharding/sharding.cpp @@ -1,7 +1,7 @@ #include "sharding.h" #include "xx_hash.h" +#include "unboxed_reader.h" #include <ydb/library/yql/utils/yql_panic.h> -#include <ydb/library/yql/minikql/mkql_node.h> #include <util/string/join.h> namespace NKikimr::NSharding { @@ -32,49 +32,6 @@ void TShardingBase::AppendField(const std::shared_ptr<arrow::Array>& array, int }); } -void TUnboxedValueReader::BuildStringForHash(const NKikimr::NUdf::TUnboxedValue& value, IHashCalcer& hashCalcer) const { - for (auto&& i : ColumnsInfo) { - auto columnValue = value.GetElement(i.Idx); - if (columnValue.IsString()) { - hashCalcer.Update((const ui8*)columnValue.AsStringRef().Data(), columnValue.AsStringRef().Size()); - } else if (columnValue.IsEmbedded()) { - switch (i.Type.GetTypeId()) { - case NScheme::NTypeIds::Uint16: - FieldToHashString<ui16>(columnValue, hashCalcer); - continue; - case NScheme::NTypeIds::Uint32: - FieldToHashString<ui32>(columnValue, hashCalcer); - continue; - case NScheme::NTypeIds::Uint64: - FieldToHashString<ui64>(columnValue, hashCalcer); - continue; - case NScheme::NTypeIds::Int16: - FieldToHashString<i16>(columnValue, hashCalcer); - continue; - case NScheme::NTypeIds::Int32: - FieldToHashString<i32>(columnValue, hashCalcer); - continue; - case NScheme::NTypeIds::Int64: - FieldToHashString<i64>(columnValue, hashCalcer); - continue; - } - YQL_ENSURE(false, "incorrect column type for shard calculation"); - } else { - YQL_ENSURE(false, "incorrect column type for shard calculation"); - } - } -} - -TUnboxedValueReader::TUnboxedValueReader(const NMiniKQL::TStructType* structInfo, - const TMap<TString, TExternalTableColumn>& columnsRemap, const std::vector<TString>& shardingColumns) { - YQL_ENSURE(shardingColumns.size()); - for (auto&& i : shardingColumns) { - auto it = columnsRemap.find(i); - YQL_ENSURE(it != columnsRemap.end()); - ColumnsInfo.emplace_back(TColumnUnboxedPlaceInfo(it->second, structInfo->GetMemberIndex(i), i)); - } -} - std::unique_ptr<TShardingBase> TShardingBase::BuildShardingOperator(const NKikimrSchemeOp::TColumnTableSharding& sharding) { if (sharding.HasHashSharding()) { auto& hashSharding = sharding.GetHashSharding(); @@ -133,7 +90,7 @@ std::vector<ui64> THashSharding::MakeHashes(const std::shared_ptr<arrow::RecordB return out; } -ui64 THashSharding::CalcHash(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const { +ui64 THashSharding::CalcHash(const NYql::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const { TStreamStringHashCalcer hashCalcer(Seed); hashCalcer.Start(); readerInfo.BuildStringForHash(value, hashCalcer); @@ -198,7 +155,7 @@ std::vector<ui64> TLogsSharding::MakeHashes(const std::shared_ptr<arrow::RecordB return out; } -ui64 TLogsSharding::CalcHash(const NKikimr::NUdf::TUnboxedValue& /*value*/, const TUnboxedValueReader& /*readerInfo*/) const { +ui64 TLogsSharding::CalcHash(const NYql::NUdf::TUnboxedValue& /*value*/, const TUnboxedValueReader& /*readerInfo*/) const { YQL_ENSURE(false); return 0; } diff --git a/ydb/core/tx/sharding/sharding.h b/ydb/core/tx/sharding/sharding.h index 6c093ce301b..5c6101a640c 100644 --- a/ydb/core/tx/sharding/sharding.h +++ b/ydb/core/tx/sharding/sharding.h @@ -5,7 +5,6 @@ #include <ydb/core/formats/arrow/arrow_helpers.h> #include <ydb/core/protos/flat_scheme_op.pb.h> -#include <ydb/library/yql/public/udf/udf_value.h> #include <ydb/library/accessor/accessor.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api.h> @@ -13,46 +12,14 @@ #include <type_traits> -namespace NKikimr::NMiniKQL { -class TStructType; +namespace NYql::NUdf { +class TUnboxedValue; } namespace NKikimr::NSharding { -struct TExternalTableColumn { - ui32 Id; - NScheme::TTypeInfo Type; - TString TypeMod; -}; - -struct TColumnUnboxedPlaceInfo: public TExternalTableColumn { -private: - using TBase = TExternalTableColumn; -public: - const ui32 Idx; - const TString Name; - - TColumnUnboxedPlaceInfo(const TExternalTableColumn& baseInfo, const ui32 idx, const TString& name) - : TBase(baseInfo) - , Idx(idx) - , Name(name) { - - } -}; - -class TUnboxedValueReader { -private: - YDB_READONLY_DEF(std::vector<TColumnUnboxedPlaceInfo>, ColumnsInfo); - template <class T> - static void FieldToHashString(const NKikimr::NUdf::TUnboxedValue& value, IHashCalcer& hashCalcer) { - static_assert(std::is_arithmetic<T>::value); - const T result = value.Get<T>(); - hashCalcer.Update((const ui8*)&result, sizeof(result)); - } -public: - void BuildStringForHash(const NKikimr::NUdf::TUnboxedValue& value, IHashCalcer& hashCalcer) const; - TUnboxedValueReader(const NMiniKQL::TStructType* structInfo, const TMap<TString, TExternalTableColumn>& columnsRemap, const std::vector<TString>& shardingColumns); -}; +class TUnboxedValueReader; +struct TExternalTableColumn; class TShardingBase { private: @@ -66,10 +33,10 @@ public: static std::unique_ptr<TShardingBase> BuildShardingOperator(const NKikimrSchemeOp::TColumnTableSharding& shardingInfo); virtual const std::vector<TString>& GetShardingColumns() const = 0; - ui32 CalcShardId(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const { + ui32 CalcShardId(const NYql::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const { return CalcHash(value, readerInfo) % ShardsCount; } - virtual ui64 CalcHash(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const = 0; + virtual ui64 CalcHash(const NYql::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const = 0; virtual std::vector<ui32> MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch) const; virtual std::vector<ui64> MakeHashes(const std::shared_ptr<arrow::RecordBatch>& batch) const = 0; @@ -113,7 +80,7 @@ public: return ShardingColumns; } - virtual ui64 CalcHash(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const override; + virtual ui64 CalcHash(const NYql::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const override; }; // KIKIMR-11529 @@ -155,7 +122,7 @@ public: return ShardingColumns; } - virtual ui64 CalcHash(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const override; + virtual ui64 CalcHash(const NYql::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const override; }; diff --git a/ydb/core/tx/sharding/unboxed_reader.cpp b/ydb/core/tx/sharding/unboxed_reader.cpp new file mode 100644 index 00000000000..4a0a178e88b --- /dev/null +++ b/ydb/core/tx/sharding/unboxed_reader.cpp @@ -0,0 +1,50 @@ +#include "unboxed_reader.h" +#include <ydb/library/yql/utils/yql_panic.h> +#include <ydb/library/yql/minikql/mkql_node.h> + +namespace NKikimr::NSharding { + +void TUnboxedValueReader::BuildStringForHash(const NKikimr::NUdf::TUnboxedValue& value, IHashCalcer& hashCalcer) const { + for (auto&& i : ColumnsInfo) { + auto columnValue = value.GetElement(i.Idx); + if (columnValue.IsString()) { + hashCalcer.Update((const ui8*)columnValue.AsStringRef().Data(), columnValue.AsStringRef().Size()); + } else if (columnValue.IsEmbedded()) { + switch (i.Type.GetTypeId()) { + case NScheme::NTypeIds::Uint16: + FieldToHashString<ui16>(columnValue, hashCalcer); + continue; + case NScheme::NTypeIds::Uint32: + FieldToHashString<ui32>(columnValue, hashCalcer); + continue; + case NScheme::NTypeIds::Uint64: + FieldToHashString<ui64>(columnValue, hashCalcer); + continue; + case NScheme::NTypeIds::Int16: + FieldToHashString<i16>(columnValue, hashCalcer); + continue; + case NScheme::NTypeIds::Int32: + FieldToHashString<i32>(columnValue, hashCalcer); + continue; + case NScheme::NTypeIds::Int64: + FieldToHashString<i64>(columnValue, hashCalcer); + continue; + } + YQL_ENSURE(false, "incorrect column type for shard calculation"); + } else { + YQL_ENSURE(false, "incorrect column type for shard calculation"); + } + } +} + +TUnboxedValueReader::TUnboxedValueReader(const NMiniKQL::TStructType* structInfo, + const TMap<TString, TExternalTableColumn>& columnsRemap, const std::vector<TString>& shardingColumns) { + YQL_ENSURE(shardingColumns.size()); + for (auto&& i : shardingColumns) { + auto it = columnsRemap.find(i); + YQL_ENSURE(it != columnsRemap.end()); + ColumnsInfo.emplace_back(TColumnUnboxedPlaceInfo(it->second, structInfo->GetMemberIndex(i), i)); + } +} + +} diff --git a/ydb/core/tx/sharding/unboxed_reader.h b/ydb/core/tx/sharding/unboxed_reader.h new file mode 100644 index 00000000000..e9c5ffa6c1d --- /dev/null +++ b/ydb/core/tx/sharding/unboxed_reader.h @@ -0,0 +1,49 @@ +#pragma once +#include "hash.h" +#include <ydb/library/yql/public/udf/udf_value.h> +#include <ydb/library/accessor/accessor.h> +#include <ydb/core/scheme_types/scheme_type_info.h> +#include <util/generic/map.h> + +namespace NKikimr::NMiniKQL { +class TStructType; +} + +namespace NKikimr::NSharding { + +struct TExternalTableColumn { + ui32 Id; + NScheme::TTypeInfo Type; + TString TypeMod; +}; + +struct TColumnUnboxedPlaceInfo: public TExternalTableColumn { +private: + using TBase = TExternalTableColumn; +public: + const ui32 Idx; + const TString Name; + + TColumnUnboxedPlaceInfo(const TExternalTableColumn& baseInfo, const ui32 idx, const TString& name) + : TBase(baseInfo) + , Idx(idx) + , Name(name) { + + } +}; + +class TUnboxedValueReader { +private: + YDB_READONLY_DEF(std::vector<TColumnUnboxedPlaceInfo>, ColumnsInfo); + template <class T> + static void FieldToHashString(const NYql::NUdf::TUnboxedValue& value, IHashCalcer& hashCalcer) { + static_assert(std::is_arithmetic<T>::value); + const T result = value.Get<T>(); + hashCalcer.Update((const ui8*)&result, sizeof(result)); + } +public: + void BuildStringForHash(const NKikimr::NUdf::TUnboxedValue& value, IHashCalcer& hashCalcer) const; + TUnboxedValueReader(const NMiniKQL::TStructType* structInfo, const TMap<TString, TExternalTableColumn>& columnsRemap, const std::vector<TString>& shardingColumns); +}; + +} |