/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "DictionaryLoader.hh" #include "RLE.hh" namespace orc { namespace { // Helper function to read data fully from a stream void readFully(char* buffer, int64_t bufferSize, SeekableInputStream* stream) { int64_t posn = 0; while (posn < bufferSize) { const void* chunk; int length; if (!stream->Next(&chunk, &length)) { throw ParseError("bad read in readFully"); } if (posn + length > bufferSize) { throw ParseError("Corrupt dictionary blob"); } memcpy(buffer + posn, chunk, static_cast(length)); posn += length; } } } // namespace std::shared_ptr loadStringDictionary(uint64_t columnId, StripeStreams& stripe, MemoryPool& pool) { // Get encoding information proto::ColumnEncoding encoding = stripe.getEncoding(columnId); RleVersion rleVersion = convertRleVersion(encoding.kind()); uint32_t dictSize = encoding.dictionary_size(); // Create the dictionary object auto dictionary = std::make_shared(pool); // Read LENGTH stream to get dictionary entry lengths std::unique_ptr stream = stripe.getStream(columnId, proto::Stream_Kind_LENGTH, false); if (dictSize > 0 && stream == nullptr) { std::stringstream ss; ss << "LENGTH stream not found in StringDictionaryColumn for column " << columnId; throw ParseError(ss.str()); } std::unique_ptr lengthDecoder = createRleDecoder(std::move(stream), false, rleVersion, pool, stripe.getReaderMetrics()); // Decode dictionary entry lengths dictionary->dictionaryOffset.resize(dictSize + 1); int64_t* lengthArray = dictionary->dictionaryOffset.data(); lengthDecoder->next(lengthArray + 1, dictSize, nullptr); lengthArray[0] = 0; // Convert lengths to cumulative offsets for (uint32_t i = 1; i < dictSize + 1; ++i) { if (lengthArray[i] < 0) { std::stringstream ss; ss << "Negative dictionary entry length for column " << columnId; throw ParseError(ss.str()); } lengthArray[i] += lengthArray[i - 1]; } int64_t blobSize = lengthArray[dictSize]; // Read DICTIONARY_DATA stream to get dictionary content dictionary->dictionaryBlob.resize(static_cast(blobSize)); std::unique_ptr blobStream = stripe.getStream(columnId, proto::Stream_Kind_DICTIONARY_DATA, false); if (blobSize > 0 && blobStream == nullptr) { std::stringstream ss; ss << "DICTIONARY_DATA stream not found in StringDictionaryColumn for column " << columnId; throw ParseError(ss.str()); } // Read the dictionary blob readFully(dictionary->dictionaryBlob.data(), blobSize, blobStream.get()); return dictionary; } } // namespace orc