aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraakulaga <aakulaga@ydb.tech>2022-11-28 10:29:34 +0300
committeraakulaga <aakulaga@ydb.tech>2022-11-28 10:29:34 +0300
commit45aa07c0d3dea21982be1e7902b27d040d2e655f (patch)
treebc2222a5eca3d7675124e15c59a85402915b3566
parentd40a37c8cd5a5eece929900c7b8016c17009c774 (diff)
downloadydb-45aa07c0d3dea21982be1e7902b27d040d2e655f.tar.gz
PGSQL types added
PGSQL types added
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp96
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp270
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h49
3 files changed, 333 insertions, 82 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
index c431c1a8a48..75a6b96b2a3 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
@@ -15,6 +15,7 @@
#include <ydb/library/yql/minikql/mkql_string_util.h>
#include <ydb/library/yql/utils/log/log.h>
+#include <ydb/library/yql/parser/pg_catalog/catalog.h>
#include <chrono>
@@ -31,6 +32,9 @@ struct TColumnDataPackInfo {
NUdf::EDataSlot DataType = NUdf::EDataSlot::Uint32; // Data type of the column for standard types (TDataType)
bool IsKeyColumn = false; // True if this columns is key for join
bool IsString = false; // True if value is string
+ bool IsPgType = false; // True if column is PG type
+ bool IsPresortSupported = false; // True if pg type supports presort and can be interpreted as string value
+ bool IsIType = false; // True if column need to be processed via IHash, IEquate interfaces
ui32 Offset = 0; // Offset of column in packed data
// TValuePacker Packer; // Packer for composite data types
};
@@ -53,14 +57,19 @@ struct TGraceJoinPacker {
std::vector<NUdf::TUnboxedValue> TupleHolder; // Storage for tuple data
std::vector<NUdf::TUnboxedValue*> TuplePtrs; // Storage for tuple data pointers to use in FetchValues
std::vector<std::string> TupleStringHolder; // Storage for complex tuple data types serialized to strings
+ std::vector<NUdf::TUnboxedValue> IColumnsHolder; // Storage for interface-based types (IHash, IEquate)
GraceJoin::TupleData JoinTupleData; // TupleData to get join results
ui64 TotalColumnsNum = 0; // Total number of columns to pack
ui64 TotalIntColumnsNum = 0; // Total number of int columns
ui64 TotalStrColumnsNum = 0; // Total number of string columns
+ ui64 TotalIColumnsNum = 0; // Total number of interface-based columns
ui64 KeyIntColumnsNum = 0; // Total number of key int columns
ui64 KeyStrColumnsNum = 0; // Total number of key string columns
+ ui64 KeyIColumnsNum = 0; // Total number of interface-based columns
ui64 DataIntColumnsNum = TotalIntColumnsNum - KeyIntColumnsNum;
ui64 DataStrColumnsNum = TotalStrColumnsNum - KeyStrColumnsNum;
+ ui64 DataIColumnsNum = TotalIColumnsNum - KeyIColumnsNum;
+ std::vector<GraceJoin::TColTypeInterface> ColumnInterfaces;
inline void Pack() ; // Packs new tuple from TupleHolder and TuplePtrs to TupleIntVals, TupleStrSizes, TupleStrings
inline void UnPack(); // Unpacks packed values from TupleIntVals, TupleStrSizes, TupleStrings into TupleHolder and TuplePtrs
TGraceJoinPacker(const std::vector<TType*>& columnTypes, const std::vector<ui32>& keyColumns, const THolderFactory& holderFactory);
@@ -81,11 +90,25 @@ TColumnDataPackInfo GetPackInfo(TType* type) {
colType = type;
}
+ if (type->GetKind() == TType::EKind::Pg ) {
+
+ TPgType* pgType = AS_TYPE(TPgType, type);
+
+ res.IsPgType = true;
+ if (pgType->IsPresortSupported()) {
+ res.IsPresortSupported = true;
+ res.IsString = true;
+ res.DataType = NUdf::EDataSlot::String;
+ } else {
+ res.IsIType = true;
+ }
+ return res;
+ }
+
if (colType->GetKind() != TType::EKind::Data) {
- MKQL_ENSURE(false, "Unknown data type.");
res.IsString = true;
res.DataType = NUdf::EDataSlot::String;
- return;
+ return res;
}
colTypeId = AS_TYPE(TDataType, colType)->GetSchemeType();
@@ -103,7 +126,7 @@ TColumnDataPackInfo GetPackInfo(TType* type) {
case NUdf::EDataSlot::Int16:
res.Bytes = sizeof(i16); break;
case NUdf::EDataSlot::Uint16:
- res.Bytes = sizeof(i16); break;
+ res.Bytes = sizeof(ui16); break;
case NUdf::EDataSlot::Int32:
res.Bytes = sizeof(i32); break;
case NUdf::EDataSlot::Uint32:
@@ -117,7 +140,7 @@ TColumnDataPackInfo GetPackInfo(TType* type) {
case NUdf::EDataSlot::Double:
res.Bytes = sizeof(double); break;
case NUdf::EDataSlot::Date:
- res.Bytes = sizeof(ui64); break;
+ res.Bytes = sizeof(ui16); break;
case NUdf::EDataSlot::Datetime:
res.Bytes = sizeof(ui32); break;
case NUdf::EDataSlot::Timestamp:
@@ -136,6 +159,12 @@ TColumnDataPackInfo GetPackInfo(TType* type) {
res.Bytes = 16; break;
case NUdf::EDataSlot::String:
res.IsString = true; break;
+ case NUdf::EDataSlot::Utf8:
+ res.IsString = true; break;
+ case NUdf::EDataSlot::Yson:
+ res.IsString = true; break;
+ case NUdf::EDataSlot::Json:
+ res.IsString = true; break;
default:
{
MKQL_ENSURE(false, "Unknown data type.");
@@ -176,10 +205,14 @@ void TGraceJoinPacker::Pack() {
}
if (colType->GetKind() != TType::EKind::Data) {
- TStringBuf strBuf = Packers[i].Pack(value);
- TupleStringHolder[i] = strBuf;
- TupleStrings[offset] = TupleStringHolder[i].data();
- TupleStrSizes[offset] = TupleStringHolder[i].size();
+ if (pi.IsIType ) { // Interface-based type
+ IColumnsHolder[offset] = value;
+ } else {
+ TStringBuf strBuf = Packers[i].Pack(value);
+ TupleStringHolder[i] = strBuf;
+ TupleStrings[offset] = TupleStringHolder[i].data();
+ TupleStrSizes[offset] = TupleStringHolder[i].size();
+ }
continue;
}
@@ -285,6 +318,10 @@ void TGraceJoinPacker::UnPack() {
}
if (colType->GetKind() != TType::EKind::Data) {
+ if (colType->GetKind() == TType::EKind::Pg) {
+ value = IColumnsHolder[offset];
+ continue;
+ }
value = Packers[i].Unpack(TStringBuf(TupleStrings[offset], TupleStrSizes[offset]), HolderFactory);
continue;
}
@@ -395,21 +432,29 @@ TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, con
nColumns = ColumnsPackInfo.size();
- ui64 totalIntColumnsNum = std::count_if(ColumnsPackInfo.begin(), ColumnsPackInfo.end(), [](TColumnDataPackInfo a) { return !a.IsString; });
- ui64 totalStrColumnsNum = nColumns - totalIntColumnsNum;
+ ui64 totalIntColumnsNum = std::count_if(ColumnsPackInfo.begin(), ColumnsPackInfo.end(), [](TColumnDataPackInfo a) { return !a.IsString && !a.IsPgType; });
+ ui64 totalIColumnsNum = std::count_if(ColumnsPackInfo.begin(), ColumnsPackInfo.end(), [](TColumnDataPackInfo a) { return a.IsIType; });
+ ui64 totalStrColumnsNum = nColumns - totalIntColumnsNum - totalIColumnsNum;
- ui64 keyIntColumnsNum = std::count_if(ColumnsPackInfo.begin(), ColumnsPackInfo.end(), [](TColumnDataPackInfo a) { return (a.IsKeyColumn && !a.IsString );});
- ui64 keyStrColumnsNum = nKeyColumns - keyIntColumnsNum;
+ ui64 keyIntColumnsNum = std::count_if(ColumnsPackInfo.begin(), ColumnsPackInfo.end(), [](TColumnDataPackInfo a) { return (a.IsKeyColumn && !a.IsString && !a.IsPgType);});
+ ui64 keyIColumnsNum = std::count_if(ColumnsPackInfo.begin(), ColumnsPackInfo.end(), [](TColumnDataPackInfo a) { return (a.IsKeyColumn && a.IsIType);});
+ ui64 keyStrColumnsNum = nKeyColumns - keyIntColumnsNum - keyIColumnsNum;
ui64 dataIntColumnsNum = totalIntColumnsNum - keyIntColumnsNum;
ui64 dataStrColumnsNum = totalStrColumnsNum - keyStrColumnsNum;
TotalColumnsNum = nColumns;
TotalIntColumnsNum = totalIntColumnsNum;
TotalStrColumnsNum = totalStrColumnsNum;
+ TotalIColumnsNum = totalIColumnsNum;
+
+
KeyIntColumnsNum = keyIntColumnsNum;
KeyStrColumnsNum = keyStrColumnsNum;
+ KeyIColumnsNum = keyIColumnsNum;
+
DataIntColumnsNum = TotalIntColumnsNum - KeyIntColumnsNum;
DataStrColumnsNum = TotalStrColumnsNum - KeyStrColumnsNum;
+ DataIColumnsNum = TotalIColumnsNum - KeyIColumnsNum;
NullsBitmapSize = (nColumns / (8 * sizeof(ui64)) + 1) ;
@@ -432,27 +477,44 @@ TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, con
Offsets.resize(nColumns);
TupleHolder.resize(nColumns);
TupleStringHolder.resize(nColumns);
+ IColumnsHolder.resize(nColumns);
+
+ JoinTupleData.IColumns = IColumnsHolder.data();
std::transform(TupleHolder.begin(), TupleHolder.end(), std::back_inserter(TuplePtrs), [](NUdf::TUnboxedValue& v) { return std::addressof(v); });
ui32 currIntOffset = NullsBitmapSize * sizeof(ui64) ;
ui32 currStrOffset = 0;
+ ui32 currIOffset = 0;
ui32 currIdx = 0;
+ std::vector<GraceJoin::TColTypeInterface> ctiv;
for( auto & p: ColumnsPackInfo ) {
- if ( !p.IsString ) {
+ if ( !p.IsString && !p.IsIType ) {
p.Offset = currIntOffset;
Offsets[p.ColumnIdx] = currIntOffset;
currIntOffset += p.Bytes;
- } else {
+ } else if ( p.IsString ) {
p.Offset = currStrOffset;
Offsets[p.ColumnIdx] = currStrOffset;
currStrOffset++;
+ } else if (p.IsIType) {
+ p.Offset = currIOffset;
+ Offsets[p.ColumnIdx] = currIOffset;
+ currIOffset++;
+ GraceJoin::TColTypeInterface cti{ MakeHashImpl(p.MKQLType), MakeEquateImpl(p.MKQLType), TValuePacker(true, p.MKQLType) , HolderFactory };
+ ColumnInterfaces.push_back(cti);
}
currIdx++;
}
- TablePtr = std::make_unique<GraceJoin::TTable>(keyIntColumnsNum, keyStrColumnsNum, dataIntColumnsNum, dataStrColumnsNum);
+ GraceJoin::TColTypeInterface * cti_p = nullptr;
+
+ if (TotalIColumnsNum > 0 ) {
+ cti_p = ColumnInterfaces.data();
+ }
+
+ TablePtr = std::make_unique<GraceJoin::TTable>(KeyIntColumnsNum, KeyStrColumnsNum, DataIntColumnsNum, DataStrColumnsNum, KeyIColumnsNum, DataIColumnsNum, cti_p );
}
@@ -653,7 +715,7 @@ EFetchResult TGraceJoinState::FetchValues(TComputationContext& ctx, NUdf::TUnbox
LeftPacker->StartTime = std::chrono::system_clock::now();
}
LeftPacker->Pack();
- LeftPacker->TablePtr->AddTuple(LeftPacker->TupleIntVals.data(), LeftPacker->TupleStrings.data(), LeftPacker->TupleStrSizes.data());
+ LeftPacker->TablePtr->AddTuple(LeftPacker->TupleIntVals.data(), LeftPacker->TupleStrings.data(), LeftPacker->TupleStrSizes.data(), LeftPacker->IColumnsHolder.data());
}
if (resultRight == EFetchResult::One) {
@@ -662,7 +724,7 @@ EFetchResult TGraceJoinState::FetchValues(TComputationContext& ctx, NUdf::TUnbox
}
RightPacker->Pack();
- RightPacker->TablePtr->AddTuple(RightPacker->TupleIntVals.data(), RightPacker->TupleStrings.data(), RightPacker->TupleStrSizes.data());
+ RightPacker->TablePtr->AddTuple(RightPacker->TupleIntVals.data(), RightPacker->TupleStrings.data(), RightPacker->TupleStrSizes.data(), RightPacker->IColumnsHolder.data());
}
if (resultLeft == EFetchResult::Yield || resultRight == EFetchResult::Yield) {
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp
index ef96dc1d7dd..248c2d6faf6 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp
@@ -13,24 +13,37 @@ namespace GraceJoin {
-void TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * stringsSizes ) {
+void TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * stringsSizes, NYql::NUdf::TUnboxedValue * iColumns ) {
TotalPacked++;
TempTuple.clear();
TempTuple.insert(TempTuple.end(), intColumns, intColumns + NullsBitmapSize + NumberOfKeyIntColumns);
+ if ( NumberOfKeyIColumns > 0 ) {
+ for (ui32 i = 0; i < NumberOfKeyIColumns; i++) {
+ TempTuple.push_back((ColInterfaces + i)->HashI->Hash(*(iColumns+i)));
+ }
+ }
+
ui64 totalBytesForStrings = 0;
ui64 totalIntsForStrings = 0;
// Processing variable length string columns
- if ( NumberOfKeyStringColumns != 0 ) {
+ if ( NumberOfKeyStringColumns != 0 || NumberOfKeyIColumns != 0) {
for( ui64 i = 0; i < NumberOfKeyStringColumns; i++ ) {
totalBytesForStrings += stringsSizes[i];
}
+ for ( ui64 i = 0; i < NumberOfKeyIColumns; i++) {
+
+ TStringBuf val = (ColInterfaces + i)->Packer.Pack(*(iColumns+i));
+ IColumnsVals[i] = val;
+ totalBytesForStrings += val.size();
+ }
+
totalIntsForStrings = (totalBytesForStrings + sizeof(ui64) - 1) / sizeof(ui64);
TempTuple.push_back(totalIntsForStrings);
@@ -46,9 +59,13 @@ void TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * strings
currStrPtr+=stringsSizes[i];
}
- }
+ for( ui64 i = 0; i < NumberOfKeyIColumns; i++) {
+ std::memcpy(currStrPtr, IColumnsVals[i].data(), IColumnsVals[i].size() );
+ currStrPtr+=IColumnsVals[i].size();
+ }
+ }
XXH64_hash_t hash = XXH64(TempTuple.data(), TempTuple.size() * sizeof(ui64), 0);
@@ -70,15 +87,29 @@ void TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * strings
keyIntVals.push_back(hash);
keyIntVals.insert(keyIntVals.end(), TempTuple.begin(), TempTuple.end());
- if (NumberOfStringColumns) {
+ if (NumberOfStringColumns || NumberOfIColumns ) {
stringsOffsets.push_back(offset); // Adding offset to tuple in keyIntVals vector
stringsOffsets.push_back(stringVals.size()); // Adding offset to string values
+
// Adding strings sizes for keys and data
- stringsOffsets.insert( stringsOffsets.end(), stringsSizes, stringsSizes+NumberOfStringColumns );
+ if ( NumberOfStringColumns ) {
+ stringsOffsets.insert( stringsOffsets.end(), stringsSizes, stringsSizes+NumberOfStringColumns );
+ }
+
+ if ( NumberOfIColumns ) {
+ for ( ui64 i = NumberOfKeyIColumns - 1; i < NumberOfIColumns; i++) {
+ TStringBuf val = (ColInterfaces + i)->Packer.Pack(*(iColumns+i));
+ IColumnsVals[i] = val;
+ }
+ for (ui64 i = 0; i < NumberOfIColumns; i++ ) {
+ stringsOffsets.push_back(IColumnsVals[i].size());
+ }
+ }
}
+
// Adding data values
ui64 * dataColumns = intColumns + NullsBitmapSize + NumberOfKeyIntColumns;
dataIntVals.insert(dataIntVals.end(), dataColumns, dataColumns + NumberOfDataIntColumns);
@@ -87,9 +118,14 @@ void TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * strings
char ** dataStringsColumns = stringColumns + NumberOfKeyStringColumns;
ui32 * dataStringsSizes = stringsSizes + NumberOfKeyStringColumns;
- for( ui64 i = 0; i < NumberOfDataStringColumns; i++) {
- ui32 currStringSize = *(dataStringsSizes + i);
- stringVals.insert(stringVals.end(), *(dataStringsColumns + i), *(dataStringsColumns + i) + currStringSize);
+ for( ui64 i = 0; i < NumberOfDataStringColumns; i++) {
+ ui32 currStringSize = *(dataStringsSizes + i);
+ stringVals.insert(stringVals.end(), *(dataStringsColumns + i), *(dataStringsColumns + i) + currStringSize);
+ }
+
+ for ( ui64 i = 0; i < NumberOfDataIColumns; i++) {
+ stringVals.insert( stringVals.end(), IColumnsVals[NumberOfKeyIColumns + i].cbegin(), IColumnsVals[NumberOfKeyIColumns + i].end());
+
}
@@ -152,6 +188,44 @@ inline bool HasBitSet( ui64 * buf, ui64 Nbits ) {
return ((*buf) << (sizeof(ui64) - Nbits));
}
+
+inline bool CompareIColumns( const ui32* stringSizes1, const char * vals1,
+ const ui32* stringSizes2, const char * vals2,
+ TColTypeInterface * colInterfaces, ui64 nStringColumns, ui64 nIColumns) {
+ ui32 currOffset1 = 0;
+ ui32 currOffset2 = 0;
+ char * currVal1 = 0;
+ char * currVal2 = 0;
+ ui32 currSize1 = 0;
+ ui32 currSize2 = 0;
+ NYql::NUdf::TUnboxedValue val1, val2;
+ TStringBuf str1, str2;
+
+ for (ui32 i = 0; i < nStringColumns; i ++) {
+ currSize1 = *(stringSizes1 + i);
+ currSize2 = *(stringSizes2 + i);
+ currOffset1 += currSize1;
+ currOffset2 += currSize2;
+ }
+ for (ui32 i = 0; i < nIColumns; i ++) {
+
+ currSize1 = *(stringSizes1 + nStringColumns + i );
+ currSize2 = *(stringSizes2 + nStringColumns + i );
+ str1 = TStringBuf(vals1 + currOffset1, currSize1);
+ val1 = (colInterfaces + i)->Packer.Unpack(str1, colInterfaces->HolderFactory);
+ str2 = TStringBuf(vals2 + currOffset2, currSize2 );
+ val2 = (colInterfaces + i)->Packer.Unpack(str2, colInterfaces->HolderFactory);
+ if ( ! ((colInterfaces + i)->EquateI->Equals(val1,val2)) ) {
+ return false;
+ }
+
+ currOffset1 += currSize1;
+ currOffset2 += currSize2;
+
+ }
+ return true;
+}
+
// Joins two tables and returns join result in joined table. Tuples of joined table could be received by
// joined table iterator
void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind ) {
@@ -174,9 +248,12 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind ) {
ui64 tuplesFound = 0;
std::vector<ui64> joinSlots, spillSlots, slotToIdx;
+ std::vector<ui32> stringsOffsets1, stringsOffsets2;
ui64 reservedSize = 6 * (DefaultTupleBytes * DefaultTuplesNum) / sizeof(ui64);
joinSlots.reserve( reservedSize );
spillSlots.reserve( reservedSize );
+ stringsOffsets1.reserve(JoinTable1->NumberOfStringColumns + JoinTable1->NumberOfIColumns + 1);
+ stringsOffsets2.reserve(JoinTable2->NumberOfStringColumns + JoinTable2->NumberOfIColumns + 1);
std::vector < JoinTuplesIds > joinResults;
@@ -196,9 +273,10 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind ) {
ui64 headerSize = JoinTable1->HeaderSize;
ui64 slotSize = headerSize;
- if (JoinTable1->NumberOfKeyStringColumns != 0) {
- ui64 avgStringsSize = ( 3 * (bucket2->KeyIntVals.size() - bucket2->TuplesNum * headerSize) ) / ( 2 * bucket2->TuplesNum + 1) + 1;
- slotSize = headerSize + avgStringsSize;
+ ui64 avgStringsSize = ( 3 * (bucket2->KeyIntVals.size() - bucket2->TuplesNum * headerSize) ) / ( 2 * bucket2->TuplesNum + 1) + 1;
+
+ if (JoinTable1->NumberOfKeyStringColumns != 0 || JoinTable1->NumberOfKeyIColumns != 0) {
+ slotSize = slotSize + avgStringsSize;
}
ui64 nSlots = 3 * bucket2->TuplesNum + 1;
@@ -211,7 +289,12 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind ) {
ui32 tuple2Idx = 0;
auto it2 = bucket2->KeyIntVals.begin();
while (it2 != bucket2->KeyIntVals.end() ) {
- ui64 valSize = (!JoinTable2->NumberOfKeyStringColumns) ? headerSize : headerSize + *(it2 + headerSize - JoinTable2->TotalStringsSize) ;
+ ui64 keysValSize;
+ if ( JoinTable2->NumberOfKeyStringColumns > 0 || JoinTable2->NumberOfKeyIColumns > 0) {
+ keysValSize = headerSize + *(it2 + headerSize - 1) ;
+ } else {
+ keysValSize = headerSize;
+ }
ui64 hash = *it2;
ui64 * nullsPtr = it2+1;
if (!HasBitSet(nullsPtr, JoinTable1->NumberOfKeyColumns))
@@ -227,21 +310,21 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind ) {
slotIt = joinSlots.begin();
}
- if (valSize <= slotSize)
+ if (keysValSize <= slotSize)
{
- std::copy_n(it2, valSize, slotIt);
+ std::copy_n(it2, keysValSize, slotIt);
}
else
{
std::copy_n(it2, headerSize, slotIt);
ui64 stringsPos = spillSlots.size();
- spillSlots.insert(spillSlots.end(), it2 + headerSize, it2 + headerSize + *(it2 + headerSize - JoinTable2->TotalStringsSize));
+ spillSlots.insert(spillSlots.end(), it2 + headerSize, it2 + keysValSize);
*(slotIt + headerSize) = stringsPos;
}
ui64 currSlotNum = (slotIt - joinSlots.begin()) / slotSize;
slotToIdx[currSlotNum] = tuple2Idx;
}
- it2 += valSize;
+ it2 += keysValSize;
tuple2Idx ++;
}
@@ -250,39 +333,97 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind ) {
auto it1 = bucket1->KeyIntVals.begin();
while ( it1 < bucket1->KeyIntVals.end() ) {
- ui64 valSize = (!JoinTable1->NumberOfKeyStringColumns) ? headerSize : headerSize + *(it1 + headerSize - JoinTable1->TotalStringsSize);
+ ui64 keysValSize;
+ if ( JoinTable1->NumberOfKeyStringColumns > 0 || JoinTable1->NumberOfKeyIColumns > 0) {
+ keysValSize = headerSize + *(it1 + headerSize - 1) ;
+ } else {
+ keysValSize = headerSize;
+ }
+
ui64 hash = *it1;
ui64 * nullsPtr = it1+1;
- if (!HasBitSet(nullsPtr, JoinTable1->NumberOfKeyColumns))
+ if (HasBitSet(nullsPtr, JoinTable1->NumberOfKeyColumns))
{
- ui64 slotNum = hash % nSlots;
- auto slotIt = joinSlots.begin() + slotNum * slotSize;
- ui64 collisions = 0;
- while (*slotIt != 0 && slotIt != joinSlots.end())
- {
- bool matchFound = false;
- if (valSize <= slotSize)
- {
- if (std::equal(it1, it1 + valSize, slotIt))
- {
+ it1 += keysValSize;
+ tuple1Idx ++;
+ continue;
+ }
+
+ ui64 slotNum = hash % nSlots;
+ auto slotIt = joinSlots.begin() + slotNum * slotSize;
+ ui64 collisions = 0;
+ while (*slotIt != 0 && slotIt != joinSlots.end())
+ {
+ bool matchFound = false;
+ if (keysValSize <= slotSize && !JoinTable1->NumberOfKeyIColumns ) {
+ if (std::equal(it1, it1 + keysValSize, slotIt)) {
+ tuplesFound++;
+ matchFound = true;
+ }
+ }
+
+ if (keysValSize > slotSize && !JoinTable1->NumberOfKeyIColumns ) {
+ if (std::equal(it1, it1 + headerSize, slotIt)) {
+ ui64 stringsPos = *(slotIt + headerSize);
+ ui64 stringsSize = *(slotIt + headerSize - 1);
+ if (std::equal(it1 + headerSize, it1 + headerSize + stringsSize, spillSlots.begin() + stringsPos)) {
tuplesFound++;
matchFound = true;
}
}
- else
- {
- if (std::equal(it1, it1 + headerSize, slotIt))
- {
- ui64 stringsPos = *(slotIt + headerSize);
- ui64 stringsSize = *(slotIt + headerSize - 1);
- if (std::equal(it1 + headerSize, it1 + headerSize + stringsSize, spillSlots.begin() + stringsPos))
- {
- tuplesFound++;
- matchFound = true;
- }
+ }
+
+ if (JoinTable1->NumberOfKeyIColumns)
+ {
+ bool headerMatch = false;
+ bool stringsMatch = false;
+ bool iValuesMatch = false;
+
+ if (std::equal(it1, it1 + headerSize - 1, slotIt)) {
+ headerMatch = true;
+ }
+
+ auto slotStringsStart = slotIt + headerSize;
+
+ if (keysValSize > slotSize ) {
+ ui64 stringsPos = *(slotIt + headerSize);
+ slotStringsStart = spillSlots.begin() + stringsPos;
+ ui64 stringsSize = *(slotIt + headerSize - 1);
+
+ }
+
+ if ( JoinTable1->NumberOfKeyStringColumns == 0) {
+ stringsMatch = true;
+ } else {
+ if (headerMatch && std::equal( it1 + headerSize, it1 + headerSize + JoinTable1->NumberOfKeyStringColumns, slotStringsStart )) {
+ stringsMatch = true;
}
}
+ if (headerMatch && stringsMatch ) {
+
+
+ tuple2Idx = slotToIdx[(slotIt - joinSlots.begin()) / slotSize];
+ i64 stringsOffsetsIdx1 = tuple1Idx * (JoinTable1->NumberOfStringColumns + JoinTable1->NumberOfIColumns + 2);
+ ui64 stringsOffsetsIdx2 = tuple2Idx * (JoinTable2->NumberOfStringColumns + JoinTable2->NumberOfIColumns + 2);
+ ui32 * stringsSizesPtr1 = bucket1->StringsOffsets.data() + stringsOffsetsIdx1 + 2;
+ ui32 * stringsSizesPtr2 = bucket2->StringsOffsets.data() + stringsOffsetsIdx2 + 2;
+
+
+ iValuesMatch = CompareIColumns( stringsSizesPtr1 ,
+ (char *) (it1 + headerSize ),
+ stringsSizesPtr2,
+ (char *) (slotStringsStart),
+ JoinTable1 -> ColInterfaces, JoinTable1->NumberOfStringColumns, JoinTable1 -> NumberOfKeyIColumns );
+ }
+
+ if (headerMatch && stringsMatch && iValuesMatch) {
+ tuplesFound++;
+ matchFound = true;
+ }
+
+ }
+
if (matchFound)
{
JoinTuplesIds joinIds;
@@ -299,9 +440,9 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind ) {
if (slotIt == joinSlots.end())
slotIt = joinSlots.begin();
}
- }
+
- it1 += valSize;
+ it1 += keysValSize;
tuple1Idx ++;
}
std::sort(joinResults.begin(), joinResults.end(), [](JoinTuplesIds a, JoinTuplesIds b)
@@ -337,9 +478,9 @@ inline void TTable::GetTupleData(ui32 bucketNum, ui32 tupleId, TupleData & td) {
td.AllNulls = false;
TTableBucket & tb = TableBuckets[bucketNum];
- ui64 stringsOffsetsIdx = tupleId * (NumberOfStringColumns + 2);
+ ui64 stringsOffsetsIdx = tupleId * (NumberOfStringColumns + NumberOfIColumns + 2);
- if(NumberOfKeyStringColumns != 0) {
+ if(NumberOfKeyStringColumns != 0 || NumberOfKeyIColumns !=0 ) {
keyIntsOffset = tb.StringsOffsets[stringsOffsetsIdx];
} else {
keyIntsOffset = HeaderSize * tupleId;
@@ -357,7 +498,7 @@ inline void TTable::GetTupleData(ui32 bucketNum, ui32 tupleId, TupleData & td) {
}
char *strPtr = nullptr;
- if(NumberOfKeyStringColumns != 0) {
+ if(NumberOfKeyStringColumns != 0 || NumberOfKeyIColumns != 0) {
keyStringsOffset = tb.StringsOffsets[stringsOffsetsIdx] + HeaderSize;
strPtr = reinterpret_cast<char *>(tb.KeyIntVals.data() + keyStringsOffset);
@@ -368,6 +509,14 @@ inline void TTable::GetTupleData(ui32 bucketNum, ui32 tupleId, TupleData & td) {
td.StrSizes[i] = tb.StringsOffsets[stringsOffsetsIdx + 2 + i];
strPtr += td.StrSizes[i];
}
+
+ for ( ui64 i = 0; i < NumberOfKeyIColumns; i++) {
+ ui32 currSize = tb.StringsOffsets[stringsOffsetsIdx + 2 + NumberOfKeyStringColumns + i];
+ *(td.IColumns + i) = (ColInterfaces + i)->Packer.Unpack(TStringBuf(strPtr, currSize), ColInterfaces->HolderFactory);
+ strPtr += currSize;
+ }
+
+
}
@@ -383,7 +532,7 @@ inline void TTable::GetTupleData(ui32 bucketNum, ui32 tupleId, TupleData & td) {
strPtr += td.StrSizes[currIdx];
}
-
+
}
inline bool TTable::HasJoinedTupleId(TTable *joinedTable, ui32 &tupleId2) {
@@ -699,37 +848,48 @@ void TTable::Clear() {
// Creates new table with key columns and data columns
TTable::TTable( ui64 numberOfKeyIntColumns, ui64 numberOfKeyStringColumns,
- ui64 numberOfDataIntColumns, ui64 numberOfDataStringColumns ) :
+ ui64 numberOfDataIntColumns, ui64 numberOfDataStringColumns,
+ ui64 numberOfKeyIColumns, ui64 numberOfDataIColumns, TColTypeInterface * colInterfaces ) :
NumberOfKeyIntColumns(numberOfKeyIntColumns),
NumberOfKeyStringColumns(numberOfKeyStringColumns),
NumberOfDataIntColumns(numberOfDataIntColumns),
- NumberOfDataStringColumns(numberOfDataStringColumns) {
+ NumberOfDataStringColumns(numberOfDataStringColumns),
+ NumberOfKeyIColumns(numberOfKeyIColumns),
+ NumberOfDataIColumns(numberOfDataIColumns),
+ ColInterfaces(colInterfaces) {
- NumberOfKeyColumns = NumberOfKeyIntColumns + NumberOfKeyStringColumns;
- NumberOfDataColumns = NumberOfDataIntColumns + NumberOfDataStringColumns;
+ NumberOfKeyColumns = NumberOfKeyIntColumns + NumberOfKeyStringColumns + NumberOfKeyIColumns;
+ NumberOfDataColumns = NumberOfDataIntColumns + NumberOfDataStringColumns + NumberOfDataIColumns;
NumberOfColumns = NumberOfKeyColumns + NumberOfDataColumns;
NumberOfStringColumns = NumberOfKeyStringColumns + NumberOfDataStringColumns;
+ NumberOfIColumns = NumberOfKeyIColumns + NumberOfDataIColumns;
BytesInKeyIntColumns = NumberOfKeyIntColumns * sizeof(ui64);
NullsBitmapSize = NumberOfColumns / (8 * sizeof(ui64)) + 1;
- TotalStringsSize = (numberOfKeyStringColumns > 0 ) ? 1 : 0;
+ TotalStringsSize = (numberOfKeyStringColumns > 0 || NumberOfKeyIColumns > 0 ) ? 1 : 0;
- HeaderSize = HashSize + NullsBitmapSize + NumberOfKeyIntColumns + TotalStringsSize;
+ HeaderSize = HashSize + NullsBitmapSize + NumberOfKeyIntColumns + NumberOfKeyIColumns + TotalStringsSize;
TableBuckets.resize(NumberOfBuckets);
const ui64 reservedSizePerTuple = (2 * DefaultTupleBytes) / sizeof(ui64);
TempTuple.reserve( reservedSizePerTuple );
+ IColumnsHashes.resize(NumberOfKeyIColumns);
+ IColumnsVals.resize(NumberOfIColumns);
+
+ const ui64 totalForTuples = DefaultTuplesNum * reservedSizePerTuple;
for ( auto & b: TableBuckets ) {
- b.KeyIntVals.reserve(DefaultTuplesNum * reservedSizePerTuple );
- b.StringsOffsets.reserve(DefaultTuplesNum * reservedSizePerTuple);
- b.DataIntVals.reserve( DefaultTuplesNum * reservedSizePerTuple);
- b.StringsValues.reserve( DefaultTuplesNum * reservedSizePerTuple);
+ b.KeyIntVals.reserve( (totalForTuples * NumberOfKeyColumns) / (NumberOfColumns + 1) );
+ b.StringsOffsets.reserve((totalForTuples * NumberOfStringColumns) / (NumberOfColumns + 1));
+ b.DataIntVals.reserve( (totalForTuples * NumberOfDataIntColumns) / (NumberOfColumns + 1));
+ b.StringsValues.reserve( (totalForTuples * NumberOfStringColumns) / (NumberOfColumns + 1) );
+ b.InterfaceOffsets.reserve( (totalForTuples * NumberOfIColumns) / (NumberOfColumns + 1) );
+ b.InterfaceValues.reserve( (totalForTuples * NumberOfIColumns) / (NumberOfColumns + 1));
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h
index 934a897602b..a89c492e9ad 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h
@@ -2,6 +2,8 @@
#include <ydb/library/yql/public/udf/udf_data_type.h>
#include <ydb/library/yql/minikql/mkql_program_builder.h>
+#include <ydb/library/yql/public/udf/udf_type_builder.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_pack.h>
namespace NKikimr {
namespace NMiniKQL {
@@ -15,10 +17,11 @@ const ui64 DefaultTupleBytes = 512; // Default size of all columns in table row
const ui64 HashSize = 1; // Using ui64 hash size
/*
-Table data stored in buckets. Table columns are interpreted either as integers or strings.
+Table data stored in buckets. Table columns are interpreted either as integers, strings or some interface-based type,
+providing IHash, IEquate, IPack and IUnpack functions.
External clients should transform (pack) data into appropriate presentation.
-Key columns always first, following int columns and string columns.
+Key columns always first, following int columns, string columns and interface-based columns.
Optimum presentation of table data is chosen based on locality to perform most
processing of related data in processor caches.
@@ -45,7 +48,10 @@ struct TTableBucket {
std::vector<ui64> KeyIntVals; // Vector to store table key values
std::vector<ui64> DataIntVals; // Vector to store data values in bucket
std::vector<char> StringsValues; // Vector to store data strings values
- std::vector<ui32> StringsOffsets; // Vector to store strings values sizes (offsets in StringsValues are calculated) for particular tuple.
+ std::vector<ui32> StringsOffsets; // Vector to store strings values sizes (offsets in StringsValues are calculated) for particular tuple.
+ std::vector<char> InterfaceValues; // Vector to store types to work through external-provided IHash, IEquate interfaces
+ std::vector<ui32> InterfaceOffsets; // Vector to store sizes of columns to work through IHash, IEquate interfaces
+
std::vector<JoinTuplesIds> JoinIds; // Results of join operations stored as index of tuples in buckets
// of two tables with the same number
std::vector<ui32> RightIds; // Sorted Ids of right table joined tuples to process full join and exclusion join
@@ -56,28 +62,44 @@ struct TupleData {
ui64 * IntColumns = nullptr; // Array of packed int data of the table. Caller should allocate array of NumberOfIntColumns size
char ** StrColumns = nullptr; // Pointers to values of strings for table. Strings are not null-terminated
ui32 * StrSizes = nullptr; // Sizes of strings for table.
+ NYql::NUdf::TUnboxedValue * IColumns = nullptr; // Array of TUboxedValues for interface-based columns. Caller should allocate array of required size.
bool AllNulls = false; // If tuple data contains all nulls (it is required for corresponding join types)
};
+// Interface to work with complex column types without "simple" byte-serialized representation (which can be used for keys comparison)
+struct TColTypeInterface {
+ NYql::NUdf::IHash::TPtr HashI = nullptr; // Interface to calculate hash of column value
+ NYql::NUdf::IEquate::TPtr EquateI = nullptr; // Interface to compare two column values
+ TValuePacker Packer; // Class to pack and unpack column values
+ const THolderFactory& HolderFactory; // To use during unpacking
+};
+
// Class which represents single table data stored in buckets
class TTable {
ui64 NumberOfKeyIntColumns = 0; // Key int columns always first and padded to sizeof(ui64).
- ui64 NumberOfKeyStringColumns = 0; // String key columns go after key int columns
+ ui64 NumberOfKeyStringColumns = 0; // String key columns go after key int columns
+ ui64 NumberOfKeyIColumns = 0; // Number of interface - provided key columns
+
+
+ ui64 NumberOfDataIntColumns = 0; //Number of integer data columns in the Table
+ ui64 NumberOfDataStringColumns = 0; // Number of strings data columns in the Table
+ ui64 NumberOfDataIColumns = 0; // Number of interface - provided data columns
+
+ TColTypeInterface * ColInterfaces = nullptr; // Array of interfaces to work with corresponding columns data
- ui64 NumberOfDataIntColumns = 0; //Number of integer columns in the Table
- ui64 NumberOfDataStringColumns = 0; // Number of strings columns in the Table
ui64 NumberOfColumns = 0; // Number of columns in the Table
ui64 NumberOfKeyColumns = 0; // Number of key columns in the Table
ui64 NumberOfDataColumns = 0; // Number of data columns in the Table
ui64 NumberOfStringColumns = 0; // Total number of String Columns
+ ui64 NumberOfIColumns = 0; // Total number of interface-based columns
ui64 NullsBitmapSize = 1; // Default size of ui64 values used for null columns bitmap.
// Every bit set means null value. Order of columns is equal to order in AddTuple call.
// First key int column is bit 0 in bit mask, second - bit 1, etc. Bit 0 is least significant in bitmask.
ui64 TotalStringsSize = 0; // Bytes in tuple header reserved to store total strings size key tuple columns
- ui64 HeaderSize = HashSize + NullsBitmapSize + NumberOfKeyIntColumns + TotalStringsSize; // Header of all tuples size
+ ui64 HeaderSize = HashSize + NullsBitmapSize + NumberOfKeyIntColumns + NumberOfKeyIColumns + TotalStringsSize; // Header of all tuples size
ui64 BytesInKeyIntColumns = sizeof(ui64) * NumberOfKeyIntColumns;
@@ -87,6 +109,12 @@ class TTable {
// Temporary vector for tuples manipulation;
std::vector<ui64> TempTuple;
+ // Hashes for interface - based columns values
+ std::vector<ui64> IColumnsHashes;
+
+ // Serialized values for interface-based columns
+ std::vector<TString> IColumnsVals;
+
// Current iterator index for NextTuple iterator
ui64 CurrIterIndex = 0;
@@ -117,9 +145,9 @@ class TTable {
public:
// Adds new tuple to the table. intColumns, stringColumns - data of columns,
- // stringsSizes - sizes of strings columns, Indexes of null-value columns
+ // stringsSizes - sizes of strings columns. Indexes of null-value columns
// in the form of bit array should be first values of intColumns.
- void AddTuple(ui64* intColumns, char** stringColumns, ui32* stringsSizes);
+ void AddTuple(ui64* intColumns, char** stringColumns, ui32* stringsSizes, NYql::NUdf::TUnboxedValue * iColumns = nullptr);
// Resets iterators. In case of join results table it also resets iterators for joined tables
void ResetIterator();
@@ -140,7 +168,8 @@ public:
// Creates new table with key columns and data columns
TTable(ui64 numberOfKeyIntColumns = 0, ui64 numberOfKeyStringColumns = 0,
- ui64 NumberOfDataIntColumns = 0, ui64 numberOfDataStringColumns = 0);
+ ui64 numberOfDataIntColumns = 0, ui64 numberOfDataStringColumns = 0,
+ ui64 numberOfKeyIColumns = 0, ui64 numberOfDataIColumns = 0, TColTypeInterface * colInterfaces = nullptr);
};