Flink读写ORC
0、前言
Flink上读取数据有两种方式:
* 继承RichSourceFunction重写父类方法(flink streaming)
* 查找官方文档中是否存在connector(flink streaming和flink dataSet)
Flink上将数据写入存储也有两种方式:
* 继承RichSinkFunction重写父类方法(flink streaming)
* 实现OutputFormat接口(flink streaming和flink dataSet)
一、读ORC文件
方式一:因为该方式依赖中有Hadoop依赖,实践中本地运行没有问题,集群上由于Hadoop依赖导致任务启动失败
<dependency> <groupId>org.apache.orc</groupId> <artifactId>orc-mapreduce</artifactId> <version>1.4.0</version> </dependency>
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.orc.OrcFile; import org.apache.orc.Reader; import org.apache.orc.RecordReader; import org.apache.orc.TypeDescription; import org.apache.orc.mapred.OrcMapredRecordReader; import org.apache.orc.mapred.OrcStruct; import java.io.IOException; import java.util.ArrayList; public class OcrfTest { @Test public void ocrfReadTest() throws IOException { String fileName = "/Users/garfield/Downloads/attempt_20211202020436_135940_m_000194_1012_6011504.orcf"; List<OrcStruct> orcStructs = localOrcFileToList(fileName); } public static List<OrcStruct> localOrcFileToList(String filename) throws IOException { Path testFilePath = new Path(filename); Configuration conf = new Configuration(); Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf)); RecordReader rows = reader.rows(); TypeDescription schema = reader.getSchema(); List<TypeDescription> children = schema.getChildren(); VectorizedRowBatch batch = schema.createRowBatch(); int numberOfChildren = children.size(); List<OrcStruct> resultList = new ArrayList<>(); while (rows.nextBatch(batch)) { for (int r = 0; r < batch.size; r++) { OrcStruct result = new OrcStruct(schema); for(int i=0; i < numberOfChildren; ++i) { result.setFieldValue(i, OrcMapredRecordReader.nextValue(batch.cols[i], 1, children.get(i), result.getFieldValue(i))); } resultList.add(result); if(r == 10) { break; } } } rows.close(); return resultList; } }
方式二:使用Java方式读取orc文件,自定义RichSourceFunction(流处理)
或者传入path,在map中读取(批处理)
<dependency> <groupId>org.apache.orc</groupId> <artifactId>orc-core</artifactId> <version>1.7.1</version> </dependency>
import java.io.IOException; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.orc.OrcFile; import org.apache.orc.Reader; import org.apache.orc.RecordReader; import org.apache.orc.TypeDescription; public class OrcReader { private static final int BATCH_SIZE = 2048; public static List<Map<String, Object>> read(Configuration configuration, String path) throws IOException { List<Map<String, Object>> rows = new LinkedList<>(); try (Reader reader = OrcFile.createReader(new Path(path), OrcFile.readerOptions(configuration))) { TypeDescription schema = reader.getSchema(); System.out.println(schema.toString()); try (RecordReader records = reader.rows(reader.options())) { VectorizedRowBatch batch = reader.getSchema().createRowBatch(BATCH_SIZE); BytesColumnVector q36ColumnVector = (BytesColumnVector) batch.cols[1]; LongColumnVector firstTsColumnVector = (LongColumnVector) batch.cols[3]; LongColumnVector lastTsColumnVector = (LongColumnVector) batch.cols[4]; BytesColumnVector dtAccountIdColumnVector = (BytesColumnVector) batch.cols[11]; while (records.nextBatch(batch)) { for (int rowNum = 0; rowNum < batch.size; rowNum++) { Map<String, Object> map = new HashMap<>(); map.put("qimei36", q36ColumnVector.toString(rowNum)); map.put("firstTs", firstTsColumnVector.vector[rowNum]); map.put("lastTs", lastTsColumnVector.vector[rowNum]); map.put("dtAccountId", dtAccountIdColumnVector.toString(rowNum)); rows.add(map); } } } } return rows; } }
二、写ORC文件
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.orc.OrcFile; import org.apache.orc.TypeDescription; import org.apache.orc.Writer; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.LinkedList; import java.util.List; import java.util.Map; public class OrcFileWriter { public static void main(String[] args) throws IOException { List<Map<String, Object>> data = new LinkedList<>(); data.add(Map.of("order_id", 1, "item_name", "Laptop", "price", 800.0f)); data.add(Map.of("order_id", 2, "item_name", "Mouse", "price", 150.0f)); data.add(Map.of("order_id", 3, "item_name", "Keyboard", "price", 250.0f)); write(new Configuration(), "orders.orc", "struct<order_id:int,item_name:string,price:float>", data); } public static void write(Configuration config, String path, String struct, List<Map<String, Object>> data) throws IOException { TypeDescription schema = TypeDescription.fromString(struct); VectorizedRowBatch batch = schema.createRowBatch(); LongColumnVector orderIdColumnVector = (LongColumnVector) batch.cols[0]; BytesColumnVector itemNameColumnVector = (BytesColumnVector) batch.cols[1]; DoubleColumnVector priceColumnVector = (DoubleColumnVector) batch.cols[2]; try (Writer writer = OrcFile.createWriter(new Path(path),OrcFile.writerOptions(config).setSchema(schema))) { for (Map<String, Object> row : data) { int rowNum = batch.size++; orderIdColumnVector.vector[rowNum] = (Integer) row.get("order_id"); byte[] buffer = row.get("item_name").toString().getBytes(StandardCharsets.UTF_8); itemNameColumnVector.setRef(rowNum, buffer, 0, buffer.length); priceColumnVector.vector[rowNum] = (Float) row.get("price"); if (batch.size == batch.getMaxSize()) { writer.addRowBatch(batch); batch.reset(); } } if (batch.size != 0) { writer.addRowBatch(batch); } } } }