diff options
author | aakulaga <aakulaga@ydb.tech> | 2022-11-28 10:29:34 +0300 |
---|---|---|
committer | aakulaga <aakulaga@ydb.tech> | 2022-11-28 10:29:34 +0300 |
commit | 45aa07c0d3dea21982be1e7902b27d040d2e655f (patch) | |
tree | bc2222a5eca3d7675124e15c59a85402915b3566 | |
parent | d40a37c8cd5a5eece929900c7b8016c17009c774 (diff) | |
download | ydb-45aa07c0d3dea21982be1e7902b27d040d2e655f.tar.gz |
PGSQL types added
PGSQL types added
3 files changed, 333 insertions, 82 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp index c431c1a8a48..75a6b96b2a3 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp @@ -15,6 +15,7 @@ #include <ydb/library/yql/minikql/mkql_string_util.h> #include <ydb/library/yql/utils/log/log.h> +#include <ydb/library/yql/parser/pg_catalog/catalog.h> #include <chrono> @@ -31,6 +32,9 @@ struct TColumnDataPackInfo { NUdf::EDataSlot DataType = NUdf::EDataSlot::Uint32; // Data type of the column for standard types (TDataType) bool IsKeyColumn = false; // True if this columns is key for join bool IsString = false; // True if value is string + bool IsPgType = false; // True if column is PG type + bool IsPresortSupported = false; // True if pg type supports presort and can be interpreted as string value + bool IsIType = false; // True if column need to be processed via IHash, IEquate interfaces ui32 Offset = 0; // Offset of column in packed data // TValuePacker Packer; // Packer for composite data types }; @@ -53,14 +57,19 @@ struct TGraceJoinPacker { std::vector<NUdf::TUnboxedValue> TupleHolder; // Storage for tuple data std::vector<NUdf::TUnboxedValue*> TuplePtrs; // Storage for tuple data pointers to use in FetchValues std::vector<std::string> TupleStringHolder; // Storage for complex tuple data types serialized to strings + std::vector<NUdf::TUnboxedValue> IColumnsHolder; // Storage for interface-based types (IHash, IEquate) GraceJoin::TupleData JoinTupleData; // TupleData to get join results ui64 TotalColumnsNum = 0; // Total number of columns to pack ui64 TotalIntColumnsNum = 0; // Total number of int columns ui64 TotalStrColumnsNum = 0; // Total number of string columns + ui64 TotalIColumnsNum = 0; // Total number of interface-based columns ui64 KeyIntColumnsNum = 0; // Total number of key int columns ui64 KeyStrColumnsNum = 0; // Total number of key string columns + ui64 KeyIColumnsNum = 0; // Total number of interface-based columns ui64 DataIntColumnsNum = TotalIntColumnsNum - KeyIntColumnsNum; ui64 DataStrColumnsNum = TotalStrColumnsNum - KeyStrColumnsNum; + ui64 DataIColumnsNum = TotalIColumnsNum - KeyIColumnsNum; + std::vector<GraceJoin::TColTypeInterface> ColumnInterfaces; inline void Pack() ; // Packs new tuple from TupleHolder and TuplePtrs to TupleIntVals, TupleStrSizes, TupleStrings inline void UnPack(); // Unpacks packed values from TupleIntVals, TupleStrSizes, TupleStrings into TupleHolder and TuplePtrs TGraceJoinPacker(const std::vector<TType*>& columnTypes, const std::vector<ui32>& keyColumns, const THolderFactory& holderFactory); @@ -81,11 +90,25 @@ TColumnDataPackInfo GetPackInfo(TType* type) { colType = type; } + if (type->GetKind() == TType::EKind::Pg ) { + + TPgType* pgType = AS_TYPE(TPgType, type); + + res.IsPgType = true; + if (pgType->IsPresortSupported()) { + res.IsPresortSupported = true; + res.IsString = true; + res.DataType = NUdf::EDataSlot::String; + } else { + res.IsIType = true; + } + return res; + } + if (colType->GetKind() != TType::EKind::Data) { - MKQL_ENSURE(false, "Unknown data type."); res.IsString = true; res.DataType = NUdf::EDataSlot::String; - return; + return res; } colTypeId = AS_TYPE(TDataType, colType)->GetSchemeType(); @@ -103,7 +126,7 @@ TColumnDataPackInfo GetPackInfo(TType* type) { case NUdf::EDataSlot::Int16: res.Bytes = sizeof(i16); break; case NUdf::EDataSlot::Uint16: - res.Bytes = sizeof(i16); break; + res.Bytes = sizeof(ui16); break; case NUdf::EDataSlot::Int32: res.Bytes = sizeof(i32); break; case NUdf::EDataSlot::Uint32: @@ -117,7 +140,7 @@ TColumnDataPackInfo GetPackInfo(TType* type) { case NUdf::EDataSlot::Double: res.Bytes = sizeof(double); break; case NUdf::EDataSlot::Date: - res.Bytes = sizeof(ui64); break; + res.Bytes = sizeof(ui16); break; case NUdf::EDataSlot::Datetime: res.Bytes = sizeof(ui32); break; case NUdf::EDataSlot::Timestamp: @@ -136,6 +159,12 @@ TColumnDataPackInfo GetPackInfo(TType* type) { res.Bytes = 16; break; case NUdf::EDataSlot::String: res.IsString = true; break; + case NUdf::EDataSlot::Utf8: + res.IsString = true; break; + case NUdf::EDataSlot::Yson: + res.IsString = true; break; + case NUdf::EDataSlot::Json: + res.IsString = true; break; default: { MKQL_ENSURE(false, "Unknown data type."); @@ -176,10 +205,14 @@ void TGraceJoinPacker::Pack() { } if (colType->GetKind() != TType::EKind::Data) { - TStringBuf strBuf = Packers[i].Pack(value); - TupleStringHolder[i] = strBuf; - TupleStrings[offset] = TupleStringHolder[i].data(); - TupleStrSizes[offset] = TupleStringHolder[i].size(); + if (pi.IsIType ) { // Interface-based type + IColumnsHolder[offset] = value; + } else { + TStringBuf strBuf = Packers[i].Pack(value); + TupleStringHolder[i] = strBuf; + TupleStrings[offset] = TupleStringHolder[i].data(); + TupleStrSizes[offset] = TupleStringHolder[i].size(); + } continue; } @@ -285,6 +318,10 @@ void TGraceJoinPacker::UnPack() { } if (colType->GetKind() != TType::EKind::Data) { + if (colType->GetKind() == TType::EKind::Pg) { + value = IColumnsHolder[offset]; + continue; + } value = Packers[i].Unpack(TStringBuf(TupleStrings[offset], TupleStrSizes[offset]), HolderFactory); continue; } @@ -395,21 +432,29 @@ TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, con nColumns = ColumnsPackInfo.size(); - ui64 totalIntColumnsNum = std::count_if(ColumnsPackInfo.begin(), ColumnsPackInfo.end(), [](TColumnDataPackInfo a) { return !a.IsString; }); - ui64 totalStrColumnsNum = nColumns - totalIntColumnsNum; + ui64 totalIntColumnsNum = std::count_if(ColumnsPackInfo.begin(), ColumnsPackInfo.end(), [](TColumnDataPackInfo a) { return !a.IsString && !a.IsPgType; }); + ui64 totalIColumnsNum = std::count_if(ColumnsPackInfo.begin(), ColumnsPackInfo.end(), [](TColumnDataPackInfo a) { return a.IsIType; }); + ui64 totalStrColumnsNum = nColumns - totalIntColumnsNum - totalIColumnsNum; - ui64 keyIntColumnsNum = std::count_if(ColumnsPackInfo.begin(), ColumnsPackInfo.end(), [](TColumnDataPackInfo a) { return (a.IsKeyColumn && !a.IsString );}); - ui64 keyStrColumnsNum = nKeyColumns - keyIntColumnsNum; + ui64 keyIntColumnsNum = std::count_if(ColumnsPackInfo.begin(), ColumnsPackInfo.end(), [](TColumnDataPackInfo a) { return (a.IsKeyColumn && !a.IsString && !a.IsPgType);}); + ui64 keyIColumnsNum = std::count_if(ColumnsPackInfo.begin(), ColumnsPackInfo.end(), [](TColumnDataPackInfo a) { return (a.IsKeyColumn && a.IsIType);}); + ui64 keyStrColumnsNum = nKeyColumns - keyIntColumnsNum - keyIColumnsNum; ui64 dataIntColumnsNum = totalIntColumnsNum - keyIntColumnsNum; ui64 dataStrColumnsNum = totalStrColumnsNum - keyStrColumnsNum; TotalColumnsNum = nColumns; TotalIntColumnsNum = totalIntColumnsNum; TotalStrColumnsNum = totalStrColumnsNum; + TotalIColumnsNum = totalIColumnsNum; + + KeyIntColumnsNum = keyIntColumnsNum; KeyStrColumnsNum = keyStrColumnsNum; + KeyIColumnsNum = keyIColumnsNum; + DataIntColumnsNum = TotalIntColumnsNum - KeyIntColumnsNum; DataStrColumnsNum = TotalStrColumnsNum - KeyStrColumnsNum; + DataIColumnsNum = TotalIColumnsNum - KeyIColumnsNum; NullsBitmapSize = (nColumns / (8 * sizeof(ui64)) + 1) ; @@ -432,27 +477,44 @@ TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, con Offsets.resize(nColumns); TupleHolder.resize(nColumns); TupleStringHolder.resize(nColumns); + IColumnsHolder.resize(nColumns); + + JoinTupleData.IColumns = IColumnsHolder.data(); std::transform(TupleHolder.begin(), TupleHolder.end(), std::back_inserter(TuplePtrs), [](NUdf::TUnboxedValue& v) { return std::addressof(v); }); ui32 currIntOffset = NullsBitmapSize * sizeof(ui64) ; ui32 currStrOffset = 0; + ui32 currIOffset = 0; ui32 currIdx = 0; + std::vector<GraceJoin::TColTypeInterface> ctiv; for( auto & p: ColumnsPackInfo ) { - if ( !p.IsString ) { + if ( !p.IsString && !p.IsIType ) { p.Offset = currIntOffset; Offsets[p.ColumnIdx] = currIntOffset; currIntOffset += p.Bytes; - } else { + } else if ( p.IsString ) { p.Offset = currStrOffset; Offsets[p.ColumnIdx] = currStrOffset; currStrOffset++; + } else if (p.IsIType) { + p.Offset = currIOffset; + Offsets[p.ColumnIdx] = currIOffset; + currIOffset++; + GraceJoin::TColTypeInterface cti{ MakeHashImpl(p.MKQLType), MakeEquateImpl(p.MKQLType), TValuePacker(true, p.MKQLType) , HolderFactory }; + ColumnInterfaces.push_back(cti); } currIdx++; } - TablePtr = std::make_unique<GraceJoin::TTable>(keyIntColumnsNum, keyStrColumnsNum, dataIntColumnsNum, dataStrColumnsNum); + GraceJoin::TColTypeInterface * cti_p = nullptr; + + if (TotalIColumnsNum > 0 ) { + cti_p = ColumnInterfaces.data(); + } + + TablePtr = std::make_unique<GraceJoin::TTable>(KeyIntColumnsNum, KeyStrColumnsNum, DataIntColumnsNum, DataStrColumnsNum, KeyIColumnsNum, DataIColumnsNum, cti_p ); } @@ -653,7 +715,7 @@ EFetchResult TGraceJoinState::FetchValues(TComputationContext& ctx, NUdf::TUnbox LeftPacker->StartTime = std::chrono::system_clock::now(); } LeftPacker->Pack(); - LeftPacker->TablePtr->AddTuple(LeftPacker->TupleIntVals.data(), LeftPacker->TupleStrings.data(), LeftPacker->TupleStrSizes.data()); + LeftPacker->TablePtr->AddTuple(LeftPacker->TupleIntVals.data(), LeftPacker->TupleStrings.data(), LeftPacker->TupleStrSizes.data(), LeftPacker->IColumnsHolder.data()); } if (resultRight == EFetchResult::One) { @@ -662,7 +724,7 @@ EFetchResult TGraceJoinState::FetchValues(TComputationContext& ctx, NUdf::TUnbox } RightPacker->Pack(); - RightPacker->TablePtr->AddTuple(RightPacker->TupleIntVals.data(), RightPacker->TupleStrings.data(), RightPacker->TupleStrSizes.data()); + RightPacker->TablePtr->AddTuple(RightPacker->TupleIntVals.data(), RightPacker->TupleStrings.data(), RightPacker->TupleStrSizes.data(), RightPacker->IColumnsHolder.data()); } if (resultLeft == EFetchResult::Yield || resultRight == EFetchResult::Yield) { diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp index ef96dc1d7dd..248c2d6faf6 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp @@ -13,24 +13,37 @@ namespace GraceJoin { -void TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * stringsSizes ) { +void TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * stringsSizes, NYql::NUdf::TUnboxedValue * iColumns ) { TotalPacked++; TempTuple.clear(); TempTuple.insert(TempTuple.end(), intColumns, intColumns + NullsBitmapSize + NumberOfKeyIntColumns); + if ( NumberOfKeyIColumns > 0 ) { + for (ui32 i = 0; i < NumberOfKeyIColumns; i++) { + TempTuple.push_back((ColInterfaces + i)->HashI->Hash(*(iColumns+i))); + } + } + ui64 totalBytesForStrings = 0; ui64 totalIntsForStrings = 0; // Processing variable length string columns - if ( NumberOfKeyStringColumns != 0 ) { + if ( NumberOfKeyStringColumns != 0 || NumberOfKeyIColumns != 0) { for( ui64 i = 0; i < NumberOfKeyStringColumns; i++ ) { totalBytesForStrings += stringsSizes[i]; } + for ( ui64 i = 0; i < NumberOfKeyIColumns; i++) { + + TStringBuf val = (ColInterfaces + i)->Packer.Pack(*(iColumns+i)); + IColumnsVals[i] = val; + totalBytesForStrings += val.size(); + } + totalIntsForStrings = (totalBytesForStrings + sizeof(ui64) - 1) / sizeof(ui64); TempTuple.push_back(totalIntsForStrings); @@ -46,9 +59,13 @@ void TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * strings currStrPtr+=stringsSizes[i]; } - } + for( ui64 i = 0; i < NumberOfKeyIColumns; i++) { + std::memcpy(currStrPtr, IColumnsVals[i].data(), IColumnsVals[i].size() ); + currStrPtr+=IColumnsVals[i].size(); + } + } XXH64_hash_t hash = XXH64(TempTuple.data(), TempTuple.size() * sizeof(ui64), 0); @@ -70,15 +87,29 @@ void TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * strings keyIntVals.push_back(hash); keyIntVals.insert(keyIntVals.end(), TempTuple.begin(), TempTuple.end()); - if (NumberOfStringColumns) { + if (NumberOfStringColumns || NumberOfIColumns ) { stringsOffsets.push_back(offset); // Adding offset to tuple in keyIntVals vector stringsOffsets.push_back(stringVals.size()); // Adding offset to string values + // Adding strings sizes for keys and data - stringsOffsets.insert( stringsOffsets.end(), stringsSizes, stringsSizes+NumberOfStringColumns ); + if ( NumberOfStringColumns ) { + stringsOffsets.insert( stringsOffsets.end(), stringsSizes, stringsSizes+NumberOfStringColumns ); + } + + if ( NumberOfIColumns ) { + for ( ui64 i = NumberOfKeyIColumns - 1; i < NumberOfIColumns; i++) { + TStringBuf val = (ColInterfaces + i)->Packer.Pack(*(iColumns+i)); + IColumnsVals[i] = val; + } + for (ui64 i = 0; i < NumberOfIColumns; i++ ) { + stringsOffsets.push_back(IColumnsVals[i].size()); + } + } } + // Adding data values ui64 * dataColumns = intColumns + NullsBitmapSize + NumberOfKeyIntColumns; dataIntVals.insert(dataIntVals.end(), dataColumns, dataColumns + NumberOfDataIntColumns); @@ -87,9 +118,14 @@ void TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * strings char ** dataStringsColumns = stringColumns + NumberOfKeyStringColumns; ui32 * dataStringsSizes = stringsSizes + NumberOfKeyStringColumns; - for( ui64 i = 0; i < NumberOfDataStringColumns; i++) { - ui32 currStringSize = *(dataStringsSizes + i); - stringVals.insert(stringVals.end(), *(dataStringsColumns + i), *(dataStringsColumns + i) + currStringSize); + for( ui64 i = 0; i < NumberOfDataStringColumns; i++) { + ui32 currStringSize = *(dataStringsSizes + i); + stringVals.insert(stringVals.end(), *(dataStringsColumns + i), *(dataStringsColumns + i) + currStringSize); + } + + for ( ui64 i = 0; i < NumberOfDataIColumns; i++) { + stringVals.insert( stringVals.end(), IColumnsVals[NumberOfKeyIColumns + i].cbegin(), IColumnsVals[NumberOfKeyIColumns + i].end()); + } @@ -152,6 +188,44 @@ inline bool HasBitSet( ui64 * buf, ui64 Nbits ) { return ((*buf) << (sizeof(ui64) - Nbits)); } + +inline bool CompareIColumns( const ui32* stringSizes1, const char * vals1, + const ui32* stringSizes2, const char * vals2, + TColTypeInterface * colInterfaces, ui64 nStringColumns, ui64 nIColumns) { + ui32 currOffset1 = 0; + ui32 currOffset2 = 0; + char * currVal1 = 0; + char * currVal2 = 0; + ui32 currSize1 = 0; + ui32 currSize2 = 0; + NYql::NUdf::TUnboxedValue val1, val2; + TStringBuf str1, str2; + + for (ui32 i = 0; i < nStringColumns; i ++) { + currSize1 = *(stringSizes1 + i); + currSize2 = *(stringSizes2 + i); + currOffset1 += currSize1; + currOffset2 += currSize2; + } + for (ui32 i = 0; i < nIColumns; i ++) { + + currSize1 = *(stringSizes1 + nStringColumns + i ); + currSize2 = *(stringSizes2 + nStringColumns + i ); + str1 = TStringBuf(vals1 + currOffset1, currSize1); + val1 = (colInterfaces + i)->Packer.Unpack(str1, colInterfaces->HolderFactory); + str2 = TStringBuf(vals2 + currOffset2, currSize2 ); + val2 = (colInterfaces + i)->Packer.Unpack(str2, colInterfaces->HolderFactory); + if ( ! ((colInterfaces + i)->EquateI->Equals(val1,val2)) ) { + return false; + } + + currOffset1 += currSize1; + currOffset2 += currSize2; + + } + return true; +} + // Joins two tables and returns join result in joined table. Tuples of joined table could be received by // joined table iterator void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind ) { @@ -174,9 +248,12 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind ) { ui64 tuplesFound = 0; std::vector<ui64> joinSlots, spillSlots, slotToIdx; + std::vector<ui32> stringsOffsets1, stringsOffsets2; ui64 reservedSize = 6 * (DefaultTupleBytes * DefaultTuplesNum) / sizeof(ui64); joinSlots.reserve( reservedSize ); spillSlots.reserve( reservedSize ); + stringsOffsets1.reserve(JoinTable1->NumberOfStringColumns + JoinTable1->NumberOfIColumns + 1); + stringsOffsets2.reserve(JoinTable2->NumberOfStringColumns + JoinTable2->NumberOfIColumns + 1); std::vector < JoinTuplesIds > joinResults; @@ -196,9 +273,10 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind ) { ui64 headerSize = JoinTable1->HeaderSize; ui64 slotSize = headerSize; - if (JoinTable1->NumberOfKeyStringColumns != 0) { - ui64 avgStringsSize = ( 3 * (bucket2->KeyIntVals.size() - bucket2->TuplesNum * headerSize) ) / ( 2 * bucket2->TuplesNum + 1) + 1; - slotSize = headerSize + avgStringsSize; + ui64 avgStringsSize = ( 3 * (bucket2->KeyIntVals.size() - bucket2->TuplesNum * headerSize) ) / ( 2 * bucket2->TuplesNum + 1) + 1; + + if (JoinTable1->NumberOfKeyStringColumns != 0 || JoinTable1->NumberOfKeyIColumns != 0) { + slotSize = slotSize + avgStringsSize; } ui64 nSlots = 3 * bucket2->TuplesNum + 1; @@ -211,7 +289,12 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind ) { ui32 tuple2Idx = 0; auto it2 = bucket2->KeyIntVals.begin(); while (it2 != bucket2->KeyIntVals.end() ) { - ui64 valSize = (!JoinTable2->NumberOfKeyStringColumns) ? headerSize : headerSize + *(it2 + headerSize - JoinTable2->TotalStringsSize) ; + ui64 keysValSize; + if ( JoinTable2->NumberOfKeyStringColumns > 0 || JoinTable2->NumberOfKeyIColumns > 0) { + keysValSize = headerSize + *(it2 + headerSize - 1) ; + } else { + keysValSize = headerSize; + } ui64 hash = *it2; ui64 * nullsPtr = it2+1; if (!HasBitSet(nullsPtr, JoinTable1->NumberOfKeyColumns)) @@ -227,21 +310,21 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind ) { slotIt = joinSlots.begin(); } - if (valSize <= slotSize) + if (keysValSize <= slotSize) { - std::copy_n(it2, valSize, slotIt); + std::copy_n(it2, keysValSize, slotIt); } else { std::copy_n(it2, headerSize, slotIt); ui64 stringsPos = spillSlots.size(); - spillSlots.insert(spillSlots.end(), it2 + headerSize, it2 + headerSize + *(it2 + headerSize - JoinTable2->TotalStringsSize)); + spillSlots.insert(spillSlots.end(), it2 + headerSize, it2 + keysValSize); *(slotIt + headerSize) = stringsPos; } ui64 currSlotNum = (slotIt - joinSlots.begin()) / slotSize; slotToIdx[currSlotNum] = tuple2Idx; } - it2 += valSize; + it2 += keysValSize; tuple2Idx ++; } @@ -250,39 +333,97 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind ) { auto it1 = bucket1->KeyIntVals.begin(); while ( it1 < bucket1->KeyIntVals.end() ) { - ui64 valSize = (!JoinTable1->NumberOfKeyStringColumns) ? headerSize : headerSize + *(it1 + headerSize - JoinTable1->TotalStringsSize); + ui64 keysValSize; + if ( JoinTable1->NumberOfKeyStringColumns > 0 || JoinTable1->NumberOfKeyIColumns > 0) { + keysValSize = headerSize + *(it1 + headerSize - 1) ; + } else { + keysValSize = headerSize; + } + ui64 hash = *it1; ui64 * nullsPtr = it1+1; - if (!HasBitSet(nullsPtr, JoinTable1->NumberOfKeyColumns)) + if (HasBitSet(nullsPtr, JoinTable1->NumberOfKeyColumns)) { - ui64 slotNum = hash % nSlots; - auto slotIt = joinSlots.begin() + slotNum * slotSize; - ui64 collisions = 0; - while (*slotIt != 0 && slotIt != joinSlots.end()) - { - bool matchFound = false; - if (valSize <= slotSize) - { - if (std::equal(it1, it1 + valSize, slotIt)) - { + it1 += keysValSize; + tuple1Idx ++; + continue; + } + + ui64 slotNum = hash % nSlots; + auto slotIt = joinSlots.begin() + slotNum * slotSize; + ui64 collisions = 0; + while (*slotIt != 0 && slotIt != joinSlots.end()) + { + bool matchFound = false; + if (keysValSize <= slotSize && !JoinTable1->NumberOfKeyIColumns ) { + if (std::equal(it1, it1 + keysValSize, slotIt)) { + tuplesFound++; + matchFound = true; + } + } + + if (keysValSize > slotSize && !JoinTable1->NumberOfKeyIColumns ) { + if (std::equal(it1, it1 + headerSize, slotIt)) { + ui64 stringsPos = *(slotIt + headerSize); + ui64 stringsSize = *(slotIt + headerSize - 1); + if (std::equal(it1 + headerSize, it1 + headerSize + stringsSize, spillSlots.begin() + stringsPos)) { tuplesFound++; matchFound = true; } } - else - { - if (std::equal(it1, it1 + headerSize, slotIt)) - { - ui64 stringsPos = *(slotIt + headerSize); - ui64 stringsSize = *(slotIt + headerSize - 1); - if (std::equal(it1 + headerSize, it1 + headerSize + stringsSize, spillSlots.begin() + stringsPos)) - { - tuplesFound++; - matchFound = true; - } + } + + if (JoinTable1->NumberOfKeyIColumns) + { + bool headerMatch = false; + bool stringsMatch = false; + bool iValuesMatch = false; + + if (std::equal(it1, it1 + headerSize - 1, slotIt)) { + headerMatch = true; + } + + auto slotStringsStart = slotIt + headerSize; + + if (keysValSize > slotSize ) { + ui64 stringsPos = *(slotIt + headerSize); + slotStringsStart = spillSlots.begin() + stringsPos; + ui64 stringsSize = *(slotIt + headerSize - 1); + + } + + if ( JoinTable1->NumberOfKeyStringColumns == 0) { + stringsMatch = true; + } else { + if (headerMatch && std::equal( it1 + headerSize, it1 + headerSize + JoinTable1->NumberOfKeyStringColumns, slotStringsStart )) { + stringsMatch = true; } } + if (headerMatch && stringsMatch ) { + + + tuple2Idx = slotToIdx[(slotIt - joinSlots.begin()) / slotSize]; + i64 stringsOffsetsIdx1 = tuple1Idx * (JoinTable1->NumberOfStringColumns + JoinTable1->NumberOfIColumns + 2); + ui64 stringsOffsetsIdx2 = tuple2Idx * (JoinTable2->NumberOfStringColumns + JoinTable2->NumberOfIColumns + 2); + ui32 * stringsSizesPtr1 = bucket1->StringsOffsets.data() + stringsOffsetsIdx1 + 2; + ui32 * stringsSizesPtr2 = bucket2->StringsOffsets.data() + stringsOffsetsIdx2 + 2; + + + iValuesMatch = CompareIColumns( stringsSizesPtr1 , + (char *) (it1 + headerSize ), + stringsSizesPtr2, + (char *) (slotStringsStart), + JoinTable1 -> ColInterfaces, JoinTable1->NumberOfStringColumns, JoinTable1 -> NumberOfKeyIColumns ); + } + + if (headerMatch && stringsMatch && iValuesMatch) { + tuplesFound++; + matchFound = true; + } + + } + if (matchFound) { JoinTuplesIds joinIds; @@ -299,9 +440,9 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind ) { if (slotIt == joinSlots.end()) slotIt = joinSlots.begin(); } - } + - it1 += valSize; + it1 += keysValSize; tuple1Idx ++; } std::sort(joinResults.begin(), joinResults.end(), [](JoinTuplesIds a, JoinTuplesIds b) @@ -337,9 +478,9 @@ inline void TTable::GetTupleData(ui32 bucketNum, ui32 tupleId, TupleData & td) { td.AllNulls = false; TTableBucket & tb = TableBuckets[bucketNum]; - ui64 stringsOffsetsIdx = tupleId * (NumberOfStringColumns + 2); + ui64 stringsOffsetsIdx = tupleId * (NumberOfStringColumns + NumberOfIColumns + 2); - if(NumberOfKeyStringColumns != 0) { + if(NumberOfKeyStringColumns != 0 || NumberOfKeyIColumns !=0 ) { keyIntsOffset = tb.StringsOffsets[stringsOffsetsIdx]; } else { keyIntsOffset = HeaderSize * tupleId; @@ -357,7 +498,7 @@ inline void TTable::GetTupleData(ui32 bucketNum, ui32 tupleId, TupleData & td) { } char *strPtr = nullptr; - if(NumberOfKeyStringColumns != 0) { + if(NumberOfKeyStringColumns != 0 || NumberOfKeyIColumns != 0) { keyStringsOffset = tb.StringsOffsets[stringsOffsetsIdx] + HeaderSize; strPtr = reinterpret_cast<char *>(tb.KeyIntVals.data() + keyStringsOffset); @@ -368,6 +509,14 @@ inline void TTable::GetTupleData(ui32 bucketNum, ui32 tupleId, TupleData & td) { td.StrSizes[i] = tb.StringsOffsets[stringsOffsetsIdx + 2 + i]; strPtr += td.StrSizes[i]; } + + for ( ui64 i = 0; i < NumberOfKeyIColumns; i++) { + ui32 currSize = tb.StringsOffsets[stringsOffsetsIdx + 2 + NumberOfKeyStringColumns + i]; + *(td.IColumns + i) = (ColInterfaces + i)->Packer.Unpack(TStringBuf(strPtr, currSize), ColInterfaces->HolderFactory); + strPtr += currSize; + } + + } @@ -383,7 +532,7 @@ inline void TTable::GetTupleData(ui32 bucketNum, ui32 tupleId, TupleData & td) { strPtr += td.StrSizes[currIdx]; } - + } inline bool TTable::HasJoinedTupleId(TTable *joinedTable, ui32 &tupleId2) { @@ -699,37 +848,48 @@ void TTable::Clear() { // Creates new table with key columns and data columns TTable::TTable( ui64 numberOfKeyIntColumns, ui64 numberOfKeyStringColumns, - ui64 numberOfDataIntColumns, ui64 numberOfDataStringColumns ) : + ui64 numberOfDataIntColumns, ui64 numberOfDataStringColumns, + ui64 numberOfKeyIColumns, ui64 numberOfDataIColumns, TColTypeInterface * colInterfaces ) : NumberOfKeyIntColumns(numberOfKeyIntColumns), NumberOfKeyStringColumns(numberOfKeyStringColumns), NumberOfDataIntColumns(numberOfDataIntColumns), - NumberOfDataStringColumns(numberOfDataStringColumns) { + NumberOfDataStringColumns(numberOfDataStringColumns), + NumberOfKeyIColumns(numberOfKeyIColumns), + NumberOfDataIColumns(numberOfDataIColumns), + ColInterfaces(colInterfaces) { - NumberOfKeyColumns = NumberOfKeyIntColumns + NumberOfKeyStringColumns; - NumberOfDataColumns = NumberOfDataIntColumns + NumberOfDataStringColumns; + NumberOfKeyColumns = NumberOfKeyIntColumns + NumberOfKeyStringColumns + NumberOfKeyIColumns; + NumberOfDataColumns = NumberOfDataIntColumns + NumberOfDataStringColumns + NumberOfDataIColumns; NumberOfColumns = NumberOfKeyColumns + NumberOfDataColumns; NumberOfStringColumns = NumberOfKeyStringColumns + NumberOfDataStringColumns; + NumberOfIColumns = NumberOfKeyIColumns + NumberOfDataIColumns; BytesInKeyIntColumns = NumberOfKeyIntColumns * sizeof(ui64); NullsBitmapSize = NumberOfColumns / (8 * sizeof(ui64)) + 1; - TotalStringsSize = (numberOfKeyStringColumns > 0 ) ? 1 : 0; + TotalStringsSize = (numberOfKeyStringColumns > 0 || NumberOfKeyIColumns > 0 ) ? 1 : 0; - HeaderSize = HashSize + NullsBitmapSize + NumberOfKeyIntColumns + TotalStringsSize; + HeaderSize = HashSize + NullsBitmapSize + NumberOfKeyIntColumns + NumberOfKeyIColumns + TotalStringsSize; TableBuckets.resize(NumberOfBuckets); const ui64 reservedSizePerTuple = (2 * DefaultTupleBytes) / sizeof(ui64); TempTuple.reserve( reservedSizePerTuple ); + IColumnsHashes.resize(NumberOfKeyIColumns); + IColumnsVals.resize(NumberOfIColumns); + + const ui64 totalForTuples = DefaultTuplesNum * reservedSizePerTuple; for ( auto & b: TableBuckets ) { - b.KeyIntVals.reserve(DefaultTuplesNum * reservedSizePerTuple ); - b.StringsOffsets.reserve(DefaultTuplesNum * reservedSizePerTuple); - b.DataIntVals.reserve( DefaultTuplesNum * reservedSizePerTuple); - b.StringsValues.reserve( DefaultTuplesNum * reservedSizePerTuple); + b.KeyIntVals.reserve( (totalForTuples * NumberOfKeyColumns) / (NumberOfColumns + 1) ); + b.StringsOffsets.reserve((totalForTuples * NumberOfStringColumns) / (NumberOfColumns + 1)); + b.DataIntVals.reserve( (totalForTuples * NumberOfDataIntColumns) / (NumberOfColumns + 1)); + b.StringsValues.reserve( (totalForTuples * NumberOfStringColumns) / (NumberOfColumns + 1) ); + b.InterfaceOffsets.reserve( (totalForTuples * NumberOfIColumns) / (NumberOfColumns + 1) ); + b.InterfaceValues.reserve( (totalForTuples * NumberOfIColumns) / (NumberOfColumns + 1)); } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h index 934a897602b..a89c492e9ad 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h @@ -2,6 +2,8 @@ #include <ydb/library/yql/public/udf/udf_data_type.h> #include <ydb/library/yql/minikql/mkql_program_builder.h> +#include <ydb/library/yql/public/udf/udf_type_builder.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_pack.h> namespace NKikimr { namespace NMiniKQL { @@ -15,10 +17,11 @@ const ui64 DefaultTupleBytes = 512; // Default size of all columns in table row const ui64 HashSize = 1; // Using ui64 hash size /* -Table data stored in buckets. Table columns are interpreted either as integers or strings. +Table data stored in buckets. Table columns are interpreted either as integers, strings or some interface-based type, +providing IHash, IEquate, IPack and IUnpack functions. External clients should transform (pack) data into appropriate presentation. -Key columns always first, following int columns and string columns. +Key columns always first, following int columns, string columns and interface-based columns. Optimum presentation of table data is chosen based on locality to perform most processing of related data in processor caches. @@ -45,7 +48,10 @@ struct TTableBucket { std::vector<ui64> KeyIntVals; // Vector to store table key values std::vector<ui64> DataIntVals; // Vector to store data values in bucket std::vector<char> StringsValues; // Vector to store data strings values - std::vector<ui32> StringsOffsets; // Vector to store strings values sizes (offsets in StringsValues are calculated) for particular tuple. + std::vector<ui32> StringsOffsets; // Vector to store strings values sizes (offsets in StringsValues are calculated) for particular tuple. + std::vector<char> InterfaceValues; // Vector to store types to work through external-provided IHash, IEquate interfaces + std::vector<ui32> InterfaceOffsets; // Vector to store sizes of columns to work through IHash, IEquate interfaces + std::vector<JoinTuplesIds> JoinIds; // Results of join operations stored as index of tuples in buckets // of two tables with the same number std::vector<ui32> RightIds; // Sorted Ids of right table joined tuples to process full join and exclusion join @@ -56,28 +62,44 @@ struct TupleData { ui64 * IntColumns = nullptr; // Array of packed int data of the table. Caller should allocate array of NumberOfIntColumns size char ** StrColumns = nullptr; // Pointers to values of strings for table. Strings are not null-terminated ui32 * StrSizes = nullptr; // Sizes of strings for table. + NYql::NUdf::TUnboxedValue * IColumns = nullptr; // Array of TUboxedValues for interface-based columns. Caller should allocate array of required size. bool AllNulls = false; // If tuple data contains all nulls (it is required for corresponding join types) }; +// Interface to work with complex column types without "simple" byte-serialized representation (which can be used for keys comparison) +struct TColTypeInterface { + NYql::NUdf::IHash::TPtr HashI = nullptr; // Interface to calculate hash of column value + NYql::NUdf::IEquate::TPtr EquateI = nullptr; // Interface to compare two column values + TValuePacker Packer; // Class to pack and unpack column values + const THolderFactory& HolderFactory; // To use during unpacking +}; + // Class which represents single table data stored in buckets class TTable { ui64 NumberOfKeyIntColumns = 0; // Key int columns always first and padded to sizeof(ui64). - ui64 NumberOfKeyStringColumns = 0; // String key columns go after key int columns + ui64 NumberOfKeyStringColumns = 0; // String key columns go after key int columns + ui64 NumberOfKeyIColumns = 0; // Number of interface - provided key columns + + + ui64 NumberOfDataIntColumns = 0; //Number of integer data columns in the Table + ui64 NumberOfDataStringColumns = 0; // Number of strings data columns in the Table + ui64 NumberOfDataIColumns = 0; // Number of interface - provided data columns + + TColTypeInterface * ColInterfaces = nullptr; // Array of interfaces to work with corresponding columns data - ui64 NumberOfDataIntColumns = 0; //Number of integer columns in the Table - ui64 NumberOfDataStringColumns = 0; // Number of strings columns in the Table ui64 NumberOfColumns = 0; // Number of columns in the Table ui64 NumberOfKeyColumns = 0; // Number of key columns in the Table ui64 NumberOfDataColumns = 0; // Number of data columns in the Table ui64 NumberOfStringColumns = 0; // Total number of String Columns + ui64 NumberOfIColumns = 0; // Total number of interface-based columns ui64 NullsBitmapSize = 1; // Default size of ui64 values used for null columns bitmap. // Every bit set means null value. Order of columns is equal to order in AddTuple call. // First key int column is bit 0 in bit mask, second - bit 1, etc. Bit 0 is least significant in bitmask. ui64 TotalStringsSize = 0; // Bytes in tuple header reserved to store total strings size key tuple columns - ui64 HeaderSize = HashSize + NullsBitmapSize + NumberOfKeyIntColumns + TotalStringsSize; // Header of all tuples size + ui64 HeaderSize = HashSize + NullsBitmapSize + NumberOfKeyIntColumns + NumberOfKeyIColumns + TotalStringsSize; // Header of all tuples size ui64 BytesInKeyIntColumns = sizeof(ui64) * NumberOfKeyIntColumns; @@ -87,6 +109,12 @@ class TTable { // Temporary vector for tuples manipulation; std::vector<ui64> TempTuple; + // Hashes for interface - based columns values + std::vector<ui64> IColumnsHashes; + + // Serialized values for interface-based columns + std::vector<TString> IColumnsVals; + // Current iterator index for NextTuple iterator ui64 CurrIterIndex = 0; @@ -117,9 +145,9 @@ class TTable { public: // Adds new tuple to the table. intColumns, stringColumns - data of columns, - // stringsSizes - sizes of strings columns, Indexes of null-value columns + // stringsSizes - sizes of strings columns. Indexes of null-value columns // in the form of bit array should be first values of intColumns. - void AddTuple(ui64* intColumns, char** stringColumns, ui32* stringsSizes); + void AddTuple(ui64* intColumns, char** stringColumns, ui32* stringsSizes, NYql::NUdf::TUnboxedValue * iColumns = nullptr); // Resets iterators. In case of join results table it also resets iterators for joined tables void ResetIterator(); @@ -140,7 +168,8 @@ public: // Creates new table with key columns and data columns TTable(ui64 numberOfKeyIntColumns = 0, ui64 numberOfKeyStringColumns = 0, - ui64 NumberOfDataIntColumns = 0, ui64 numberOfDataStringColumns = 0); + ui64 numberOfDataIntColumns = 0, ui64 numberOfDataStringColumns = 0, + ui64 numberOfKeyIColumns = 0, ui64 numberOfDataIColumns = 0, TColTypeInterface * colInterfaces = nullptr); }; |