summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <[email protected]>2023-05-05 21:03:22 +0300
committerivanmorozov <[email protected]>2023-05-05 21:03:22 +0300
commit3b5a0620ae4bf7b16da5934b17f2ae4b6851865c (patch)
treefcc901dfbee6070ce820843d3f64f7a94e493bc0
parentcffea5310713f2f5cfc5bd130e90aa49efc6d6ee (diff)
ridetech logs processing, external tiers info, etc
-rw-r--r--ydb/core/kqp/common/kqp_resolve.h1
-rw-r--r--ydb/core/tx/sharding/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/sharding/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/sharding/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/sharding/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/sharding/sharding.cpp49
-rw-r--r--ydb/core/tx/sharding/sharding.h49
-rw-r--r--ydb/core/tx/sharding/unboxed_reader.cpp50
-rw-r--r--ydb/core/tx/sharding/unboxed_reader.h49
9 files changed, 115 insertions, 87 deletions
diff --git a/ydb/core/kqp/common/kqp_resolve.h b/ydb/core/kqp/common/kqp_resolve.h
index ffc5eb7f577..6341284bfdb 100644
--- a/ydb/core/kqp/common/kqp_resolve.h
+++ b/ydb/core/kqp/common/kqp_resolve.h
@@ -2,6 +2,7 @@
#include <ydb/core/engine/mkql_keys.h>
#include <ydb/core/tx/sharding/sharding.h>
+#include <ydb/core/tx/sharding/unboxed_reader.h>
#include <ydb/core/kqp/expr_nodes/kqp_expr_nodes.h>
#include <ydb/core/protos/kqp_physical.pb.h>
#include <ydb/core/scheme/scheme_tabledefs.h>
diff --git a/ydb/core/tx/sharding/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/sharding/CMakeLists.darwin-x86_64.txt
index cab50904fd5..08200fb7797 100644
--- a/ydb/core/tx/sharding/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/sharding/CMakeLists.darwin-x86_64.txt
@@ -25,4 +25,5 @@ target_sources(core-tx-sharding PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/sharding.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/hash.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/xx_hash.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/unboxed_reader.cpp
)
diff --git a/ydb/core/tx/sharding/CMakeLists.linux-aarch64.txt b/ydb/core/tx/sharding/CMakeLists.linux-aarch64.txt
index 5e702794935..d0d22a1949a 100644
--- a/ydb/core/tx/sharding/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/sharding/CMakeLists.linux-aarch64.txt
@@ -26,4 +26,5 @@ target_sources(core-tx-sharding PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/sharding.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/hash.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/xx_hash.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/unboxed_reader.cpp
)
diff --git a/ydb/core/tx/sharding/CMakeLists.linux-x86_64.txt b/ydb/core/tx/sharding/CMakeLists.linux-x86_64.txt
index 5e702794935..d0d22a1949a 100644
--- a/ydb/core/tx/sharding/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/sharding/CMakeLists.linux-x86_64.txt
@@ -26,4 +26,5 @@ target_sources(core-tx-sharding PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/sharding.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/hash.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/xx_hash.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/unboxed_reader.cpp
)
diff --git a/ydb/core/tx/sharding/CMakeLists.windows-x86_64.txt b/ydb/core/tx/sharding/CMakeLists.windows-x86_64.txt
index cab50904fd5..08200fb7797 100644
--- a/ydb/core/tx/sharding/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/sharding/CMakeLists.windows-x86_64.txt
@@ -25,4 +25,5 @@ target_sources(core-tx-sharding PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/sharding.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/hash.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/xx_hash.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/sharding/unboxed_reader.cpp
)
diff --git a/ydb/core/tx/sharding/sharding.cpp b/ydb/core/tx/sharding/sharding.cpp
index 4e2432a0d04..7f70bfd0234 100644
--- a/ydb/core/tx/sharding/sharding.cpp
+++ b/ydb/core/tx/sharding/sharding.cpp
@@ -1,7 +1,7 @@
#include "sharding.h"
#include "xx_hash.h"
+#include "unboxed_reader.h"
#include <ydb/library/yql/utils/yql_panic.h>
-#include <ydb/library/yql/minikql/mkql_node.h>
#include <util/string/join.h>
namespace NKikimr::NSharding {
@@ -32,49 +32,6 @@ void TShardingBase::AppendField(const std::shared_ptr<arrow::Array>& array, int
});
}
-void TUnboxedValueReader::BuildStringForHash(const NKikimr::NUdf::TUnboxedValue& value, IHashCalcer& hashCalcer) const {
- for (auto&& i : ColumnsInfo) {
- auto columnValue = value.GetElement(i.Idx);
- if (columnValue.IsString()) {
- hashCalcer.Update((const ui8*)columnValue.AsStringRef().Data(), columnValue.AsStringRef().Size());
- } else if (columnValue.IsEmbedded()) {
- switch (i.Type.GetTypeId()) {
- case NScheme::NTypeIds::Uint16:
- FieldToHashString<ui16>(columnValue, hashCalcer);
- continue;
- case NScheme::NTypeIds::Uint32:
- FieldToHashString<ui32>(columnValue, hashCalcer);
- continue;
- case NScheme::NTypeIds::Uint64:
- FieldToHashString<ui64>(columnValue, hashCalcer);
- continue;
- case NScheme::NTypeIds::Int16:
- FieldToHashString<i16>(columnValue, hashCalcer);
- continue;
- case NScheme::NTypeIds::Int32:
- FieldToHashString<i32>(columnValue, hashCalcer);
- continue;
- case NScheme::NTypeIds::Int64:
- FieldToHashString<i64>(columnValue, hashCalcer);
- continue;
- }
- YQL_ENSURE(false, "incorrect column type for shard calculation");
- } else {
- YQL_ENSURE(false, "incorrect column type for shard calculation");
- }
- }
-}
-
-TUnboxedValueReader::TUnboxedValueReader(const NMiniKQL::TStructType* structInfo,
- const TMap<TString, TExternalTableColumn>& columnsRemap, const std::vector<TString>& shardingColumns) {
- YQL_ENSURE(shardingColumns.size());
- for (auto&& i : shardingColumns) {
- auto it = columnsRemap.find(i);
- YQL_ENSURE(it != columnsRemap.end());
- ColumnsInfo.emplace_back(TColumnUnboxedPlaceInfo(it->second, structInfo->GetMemberIndex(i), i));
- }
-}
-
std::unique_ptr<TShardingBase> TShardingBase::BuildShardingOperator(const NKikimrSchemeOp::TColumnTableSharding& sharding) {
if (sharding.HasHashSharding()) {
auto& hashSharding = sharding.GetHashSharding();
@@ -133,7 +90,7 @@ std::vector<ui64> THashSharding::MakeHashes(const std::shared_ptr<arrow::RecordB
return out;
}
-ui64 THashSharding::CalcHash(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const {
+ui64 THashSharding::CalcHash(const NYql::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const {
TStreamStringHashCalcer hashCalcer(Seed);
hashCalcer.Start();
readerInfo.BuildStringForHash(value, hashCalcer);
@@ -198,7 +155,7 @@ std::vector<ui64> TLogsSharding::MakeHashes(const std::shared_ptr<arrow::RecordB
return out;
}
-ui64 TLogsSharding::CalcHash(const NKikimr::NUdf::TUnboxedValue& /*value*/, const TUnboxedValueReader& /*readerInfo*/) const {
+ui64 TLogsSharding::CalcHash(const NYql::NUdf::TUnboxedValue& /*value*/, const TUnboxedValueReader& /*readerInfo*/) const {
YQL_ENSURE(false);
return 0;
}
diff --git a/ydb/core/tx/sharding/sharding.h b/ydb/core/tx/sharding/sharding.h
index 6c093ce301b..5c6101a640c 100644
--- a/ydb/core/tx/sharding/sharding.h
+++ b/ydb/core/tx/sharding/sharding.h
@@ -5,7 +5,6 @@
#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>
-#include <ydb/library/yql/public/udf/udf_value.h>
#include <ydb/library/accessor/accessor.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api.h>
@@ -13,46 +12,14 @@
#include <type_traits>
-namespace NKikimr::NMiniKQL {
-class TStructType;
+namespace NYql::NUdf {
+class TUnboxedValue;
}
namespace NKikimr::NSharding {
-struct TExternalTableColumn {
- ui32 Id;
- NScheme::TTypeInfo Type;
- TString TypeMod;
-};
-
-struct TColumnUnboxedPlaceInfo: public TExternalTableColumn {
-private:
- using TBase = TExternalTableColumn;
-public:
- const ui32 Idx;
- const TString Name;
-
- TColumnUnboxedPlaceInfo(const TExternalTableColumn& baseInfo, const ui32 idx, const TString& name)
- : TBase(baseInfo)
- , Idx(idx)
- , Name(name) {
-
- }
-};
-
-class TUnboxedValueReader {
-private:
- YDB_READONLY_DEF(std::vector<TColumnUnboxedPlaceInfo>, ColumnsInfo);
- template <class T>
- static void FieldToHashString(const NKikimr::NUdf::TUnboxedValue& value, IHashCalcer& hashCalcer) {
- static_assert(std::is_arithmetic<T>::value);
- const T result = value.Get<T>();
- hashCalcer.Update((const ui8*)&result, sizeof(result));
- }
-public:
- void BuildStringForHash(const NKikimr::NUdf::TUnboxedValue& value, IHashCalcer& hashCalcer) const;
- TUnboxedValueReader(const NMiniKQL::TStructType* structInfo, const TMap<TString, TExternalTableColumn>& columnsRemap, const std::vector<TString>& shardingColumns);
-};
+class TUnboxedValueReader;
+struct TExternalTableColumn;
class TShardingBase {
private:
@@ -66,10 +33,10 @@ public:
static std::unique_ptr<TShardingBase> BuildShardingOperator(const NKikimrSchemeOp::TColumnTableSharding& shardingInfo);
virtual const std::vector<TString>& GetShardingColumns() const = 0;
- ui32 CalcShardId(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const {
+ ui32 CalcShardId(const NYql::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const {
return CalcHash(value, readerInfo) % ShardsCount;
}
- virtual ui64 CalcHash(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const = 0;
+ virtual ui64 CalcHash(const NYql::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const = 0;
virtual std::vector<ui32> MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch) const;
virtual std::vector<ui64> MakeHashes(const std::shared_ptr<arrow::RecordBatch>& batch) const = 0;
@@ -113,7 +80,7 @@ public:
return ShardingColumns;
}
- virtual ui64 CalcHash(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const override;
+ virtual ui64 CalcHash(const NYql::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const override;
};
// KIKIMR-11529
@@ -155,7 +122,7 @@ public:
return ShardingColumns;
}
- virtual ui64 CalcHash(const NKikimr::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const override;
+ virtual ui64 CalcHash(const NYql::NUdf::TUnboxedValue& value, const TUnboxedValueReader& readerInfo) const override;
};
diff --git a/ydb/core/tx/sharding/unboxed_reader.cpp b/ydb/core/tx/sharding/unboxed_reader.cpp
new file mode 100644
index 00000000000..4a0a178e88b
--- /dev/null
+++ b/ydb/core/tx/sharding/unboxed_reader.cpp
@@ -0,0 +1,50 @@
+#include "unboxed_reader.h"
+#include <ydb/library/yql/utils/yql_panic.h>
+#include <ydb/library/yql/minikql/mkql_node.h>
+
+namespace NKikimr::NSharding {
+
+void TUnboxedValueReader::BuildStringForHash(const NKikimr::NUdf::TUnboxedValue& value, IHashCalcer& hashCalcer) const {
+ for (auto&& i : ColumnsInfo) {
+ auto columnValue = value.GetElement(i.Idx);
+ if (columnValue.IsString()) {
+ hashCalcer.Update((const ui8*)columnValue.AsStringRef().Data(), columnValue.AsStringRef().Size());
+ } else if (columnValue.IsEmbedded()) {
+ switch (i.Type.GetTypeId()) {
+ case NScheme::NTypeIds::Uint16:
+ FieldToHashString<ui16>(columnValue, hashCalcer);
+ continue;
+ case NScheme::NTypeIds::Uint32:
+ FieldToHashString<ui32>(columnValue, hashCalcer);
+ continue;
+ case NScheme::NTypeIds::Uint64:
+ FieldToHashString<ui64>(columnValue, hashCalcer);
+ continue;
+ case NScheme::NTypeIds::Int16:
+ FieldToHashString<i16>(columnValue, hashCalcer);
+ continue;
+ case NScheme::NTypeIds::Int32:
+ FieldToHashString<i32>(columnValue, hashCalcer);
+ continue;
+ case NScheme::NTypeIds::Int64:
+ FieldToHashString<i64>(columnValue, hashCalcer);
+ continue;
+ }
+ YQL_ENSURE(false, "incorrect column type for shard calculation");
+ } else {
+ YQL_ENSURE(false, "incorrect column type for shard calculation");
+ }
+ }
+}
+
+TUnboxedValueReader::TUnboxedValueReader(const NMiniKQL::TStructType* structInfo,
+ const TMap<TString, TExternalTableColumn>& columnsRemap, const std::vector<TString>& shardingColumns) {
+ YQL_ENSURE(shardingColumns.size());
+ for (auto&& i : shardingColumns) {
+ auto it = columnsRemap.find(i);
+ YQL_ENSURE(it != columnsRemap.end());
+ ColumnsInfo.emplace_back(TColumnUnboxedPlaceInfo(it->second, structInfo->GetMemberIndex(i), i));
+ }
+}
+
+}
diff --git a/ydb/core/tx/sharding/unboxed_reader.h b/ydb/core/tx/sharding/unboxed_reader.h
new file mode 100644
index 00000000000..e9c5ffa6c1d
--- /dev/null
+++ b/ydb/core/tx/sharding/unboxed_reader.h
@@ -0,0 +1,49 @@
+#pragma once
+#include "hash.h"
+#include <ydb/library/yql/public/udf/udf_value.h>
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/core/scheme_types/scheme_type_info.h>
+#include <util/generic/map.h>
+
+namespace NKikimr::NMiniKQL {
+class TStructType;
+}
+
+namespace NKikimr::NSharding {
+
+struct TExternalTableColumn {
+ ui32 Id;
+ NScheme::TTypeInfo Type;
+ TString TypeMod;
+};
+
+struct TColumnUnboxedPlaceInfo: public TExternalTableColumn {
+private:
+ using TBase = TExternalTableColumn;
+public:
+ const ui32 Idx;
+ const TString Name;
+
+ TColumnUnboxedPlaceInfo(const TExternalTableColumn& baseInfo, const ui32 idx, const TString& name)
+ : TBase(baseInfo)
+ , Idx(idx)
+ , Name(name) {
+
+ }
+};
+
+class TUnboxedValueReader {
+private:
+ YDB_READONLY_DEF(std::vector<TColumnUnboxedPlaceInfo>, ColumnsInfo);
+ template <class T>
+ static void FieldToHashString(const NYql::NUdf::TUnboxedValue& value, IHashCalcer& hashCalcer) {
+ static_assert(std::is_arithmetic<T>::value);
+ const T result = value.Get<T>();
+ hashCalcer.Update((const ui8*)&result, sizeof(result));
+ }
+public:
+ void BuildStringForHash(const NKikimr::NUdf::TUnboxedValue& value, IHashCalcer& hashCalcer) const;
+ TUnboxedValueReader(const NMiniKQL::TStructType* structInfo, const TMap<TString, TExternalTableColumn>& columnsRemap, const std::vector<TString>& shardingColumns);
+};
+
+}