Flink读写ORC

canca1年前 (2023-03-16)Java技术496

0、前言

Flink上读取数据有两种方式:

* 继承RichSourceFunction重写父类方法(flink streaming)

* 查找官方文档中是否存在connector(flink streaming和flink dataSet)


Flink上将数据写入存储也有两种方式:

* 继承RichSinkFunction重写父类方法(flink streaming)

* 实现OutputFormat接口(flink streaming和flink dataSet)


一、读ORC文件

方式一:因为该方式依赖中有Hadoop依赖,实践中本地运行没有问题,集群上由于Hadoop依赖导致任务启动失败

<dependency>
    <groupId>org.apache.orc</groupId>
    <artifactId>orc-mapreduce</artifactId>
    <version>1.4.0</version>
</dependency>


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcMapredRecordReader;
import org.apache.orc.mapred.OrcStruct;

import java.io.IOException;
import java.util.ArrayList;

public class OcrfTest {
    @Test
    public void ocrfReadTest() throws IOException {
        String fileName = "/Users/garfield/Downloads/attempt_20211202020436_135940_m_000194_1012_6011504.orcf";
        List<OrcStruct> orcStructs = localOrcFileToList(fileName);
    }

    public static List<OrcStruct> localOrcFileToList(String filename) throws IOException {
        Path testFilePath = new Path(filename);
        Configuration conf = new Configuration();
        Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
        RecordReader rows = reader.rows();
        TypeDescription schema = reader.getSchema();
        List<TypeDescription> children = schema.getChildren();
        VectorizedRowBatch batch = schema.createRowBatch();
        int numberOfChildren = children.size();
        List<OrcStruct> resultList = new ArrayList<>();
        while (rows.nextBatch(batch)) {
            for (int r = 0; r < batch.size; r++) {
                OrcStruct result = new OrcStruct(schema);
                for(int i=0; i < numberOfChildren; ++i) {
                    result.setFieldValue(i, OrcMapredRecordReader.nextValue(batch.cols[i], 1,
                            children.get(i), result.getFieldValue(i)));
                }
                resultList.add(result);
                if(r == 10) {
                    break;
                }
            }
        }
        rows.close();
        return resultList;
    }
}

方式二:使用Java方式读取orc文件,自定义RichSourceFunction(流处理)

或者传入path,在map中读取(批处理)

<dependency>
    <groupId>org.apache.orc</groupId>
    <artifactId>orc-core</artifactId>
    <version>1.7.1</version>
</dependency>


import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;


public class OrcReader {
    private static final int BATCH_SIZE = 2048;


    public static List<Map<String, Object>> read(Configuration configuration, String path)
            throws IOException {
        List<Map<String, Object>> rows = new LinkedList<>();


        try (Reader reader = OrcFile.createReader(new Path(path), OrcFile.readerOptions(configuration))) {
            TypeDescription schema = reader.getSchema();
            System.out.println(schema.toString());


            try (RecordReader records = reader.rows(reader.options())) {
                VectorizedRowBatch batch = reader.getSchema().createRowBatch(BATCH_SIZE);
                BytesColumnVector q36ColumnVector = (BytesColumnVector) batch.cols[1];
                LongColumnVector firstTsColumnVector = (LongColumnVector) batch.cols[3];
                LongColumnVector lastTsColumnVector = (LongColumnVector) batch.cols[4];
                BytesColumnVector dtAccountIdColumnVector = (BytesColumnVector) batch.cols[11];


                while (records.nextBatch(batch)) {
                    for (int rowNum = 0; rowNum < batch.size; rowNum++) {
                        Map<String, Object> map = new HashMap<>();
                        map.put("qimei36", q36ColumnVector.toString(rowNum));
                        map.put("firstTs", firstTsColumnVector.vector[rowNum]);
                        map.put("lastTs", lastTsColumnVector.vector[rowNum]);
                        map.put("dtAccountId", dtAccountIdColumnVector.toString(rowNum));
                        rows.add(map);
                    }
                }
            }
        }
        return rows;
    }
}

二、写ORC文件

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

public class OrcFileWriter {
    public static void main(String[] args) throws IOException {
        List<Map<String, Object>> data = new LinkedList<>();
        data.add(Map.of("order_id", 1, "item_name", "Laptop", "price", 800.0f));
        data.add(Map.of("order_id", 2, "item_name", "Mouse", "price", 150.0f));
        data.add(Map.of("order_id", 3, "item_name", "Keyboard", "price", 250.0f));

        write(new Configuration(), "orders.orc", "struct<order_id:int,item_name:string,price:float>", data);
    }

    public static void write(Configuration config, String path, String struct, List<Map<String, Object>> data) throws IOException {
        TypeDescription schema = TypeDescription.fromString(struct);
        VectorizedRowBatch batch = schema.createRowBatch();

        LongColumnVector orderIdColumnVector = (LongColumnVector) batch.cols[0];
        BytesColumnVector itemNameColumnVector = (BytesColumnVector) batch.cols[1];
        DoubleColumnVector priceColumnVector = (DoubleColumnVector) batch.cols[2];

        try (Writer writer = OrcFile.createWriter(new Path(path),OrcFile.writerOptions(config).setSchema(schema))) {
            for (Map<String, Object> row : data) {
                int rowNum = batch.size++;

                orderIdColumnVector.vector[rowNum] = (Integer) row.get("order_id");
                byte[] buffer = row.get("item_name").toString().getBytes(StandardCharsets.UTF_8);
                itemNameColumnVector.setRef(rowNum, buffer, 0, buffer.length);
                priceColumnVector.vector[rowNum] = (Float) row.get("price");

                if (batch.size == batch.getMaxSize()) {
                    writer.addRowBatch(batch);
                    batch.reset();
                }
            }

            if (batch.size != 0) {
                writer.addRowBatch(batch);
            }
        }
    }
}


发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。