Flink读写ORC
0、前言
Flink上读取数据有两种方式:
* 继承RichSourceFunction重写父类方法(flink streaming)
* 查找官方文档中是否存在connector(flink streaming和flink dataSet)
Flink上将数据写入存储也有两种方式:
* 继承RichSinkFunction重写父类方法(flink streaming)
* 实现OutputFormat接口(flink streaming和flink dataSet)
一、读ORC文件
方式一:因为该方式依赖中有Hadoop依赖,实践中本地运行没有问题,集群上由于Hadoop依赖导致任务启动失败
<dependency> <groupId>org.apache.orc</groupId> <artifactId>orc-mapreduce</artifactId> <version>1.4.0</version> </dependency>
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcMapredRecordReader;
import org.apache.orc.mapred.OrcStruct;
import java.io.IOException;
import java.util.ArrayList;
public class OcrfTest {
@Test
public void ocrfReadTest() throws IOException {
String fileName = "/Users/garfield/Downloads/attempt_20211202020436_135940_m_000194_1012_6011504.orcf";
List<OrcStruct> orcStructs = localOrcFileToList(fileName);
}
public static List<OrcStruct> localOrcFileToList(String filename) throws IOException {
Path testFilePath = new Path(filename);
Configuration conf = new Configuration();
Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
RecordReader rows = reader.rows();
TypeDescription schema = reader.getSchema();
List<TypeDescription> children = schema.getChildren();
VectorizedRowBatch batch = schema.createRowBatch();
int numberOfChildren = children.size();
List<OrcStruct> resultList = new ArrayList<>();
while (rows.nextBatch(batch)) {
for (int r = 0; r < batch.size; r++) {
OrcStruct result = new OrcStruct(schema);
for(int i=0; i < numberOfChildren; ++i) {
result.setFieldValue(i, OrcMapredRecordReader.nextValue(batch.cols[i], 1,
children.get(i), result.getFieldValue(i)));
}
resultList.add(result);
if(r == 10) {
break;
}
}
}
rows.close();
return resultList;
}
}方式二:使用Java方式读取orc文件,自定义RichSourceFunction(流处理)
或者传入path,在map中读取(批处理)
<dependency> <groupId>org.apache.orc</groupId> <artifactId>orc-core</artifactId> <version>1.7.1</version> </dependency>
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
public class OrcReader {
private static final int BATCH_SIZE = 2048;
public static List<Map<String, Object>> read(Configuration configuration, String path)
throws IOException {
List<Map<String, Object>> rows = new LinkedList<>();
try (Reader reader = OrcFile.createReader(new Path(path), OrcFile.readerOptions(configuration))) {
TypeDescription schema = reader.getSchema();
System.out.println(schema.toString());
try (RecordReader records = reader.rows(reader.options())) {
VectorizedRowBatch batch = reader.getSchema().createRowBatch(BATCH_SIZE);
BytesColumnVector q36ColumnVector = (BytesColumnVector) batch.cols[1];
LongColumnVector firstTsColumnVector = (LongColumnVector) batch.cols[3];
LongColumnVector lastTsColumnVector = (LongColumnVector) batch.cols[4];
BytesColumnVector dtAccountIdColumnVector = (BytesColumnVector) batch.cols[11];
while (records.nextBatch(batch)) {
for (int rowNum = 0; rowNum < batch.size; rowNum++) {
Map<String, Object> map = new HashMap<>();
map.put("qimei36", q36ColumnVector.toString(rowNum));
map.put("firstTs", firstTsColumnVector.vector[rowNum]);
map.put("lastTs", lastTsColumnVector.vector[rowNum]);
map.put("dtAccountId", dtAccountIdColumnVector.toString(rowNum));
rows.add(map);
}
}
}
}
return rows;
}
}二、写ORC文件
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
public class OrcFileWriter {
public static void main(String[] args) throws IOException {
List<Map<String, Object>> data = new LinkedList<>();
data.add(Map.of("order_id", 1, "item_name", "Laptop", "price", 800.0f));
data.add(Map.of("order_id", 2, "item_name", "Mouse", "price", 150.0f));
data.add(Map.of("order_id", 3, "item_name", "Keyboard", "price", 250.0f));
write(new Configuration(), "orders.orc", "struct<order_id:int,item_name:string,price:float>", data);
}
public static void write(Configuration config, String path, String struct, List<Map<String, Object>> data) throws IOException {
TypeDescription schema = TypeDescription.fromString(struct);
VectorizedRowBatch batch = schema.createRowBatch();
LongColumnVector orderIdColumnVector = (LongColumnVector) batch.cols[0];
BytesColumnVector itemNameColumnVector = (BytesColumnVector) batch.cols[1];
DoubleColumnVector priceColumnVector = (DoubleColumnVector) batch.cols[2];
try (Writer writer = OrcFile.createWriter(new Path(path),OrcFile.writerOptions(config).setSchema(schema))) {
for (Map<String, Object> row : data) {
int rowNum = batch.size++;
orderIdColumnVector.vector[rowNum] = (Integer) row.get("order_id");
byte[] buffer = row.get("item_name").toString().getBytes(StandardCharsets.UTF_8);
itemNameColumnVector.setRef(rowNum, buffer, 0, buffer.length);
priceColumnVector.vector[rowNum] = (Float) row.get("price");
if (batch.size == batch.getMaxSize()) {
writer.addRowBatch(batch);
batch.reset();
}
}
if (batch.size != 0) {
writer.addRowBatch(batch);
}
}
}
}