一、概述
Apache Avro不仅仅是一个数据序列化协议,更提供一个良好定义的IPC消息协议,同时提供更高级别的RPC封装,使得基于Apache Avro的RPC调用变得更快捷。
Apache Avro 提供了三种不同的文件格式分别用来定义数据序列化协议(Schema ),接口协议(Protocol ),接口定义语言(IDL)分别以.avsc,.avpr,.avdl作为文件标识。
二、maven引入
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.0</version>
</dependency>
三、编写 avsc 文件
在src/main下新建目录avro,avro目录下新建文件user.avsc
{
"namespace": "com.example.avro", // 相当于声明一个包
"type": "record", // 相当于Java中的class
"name": "User", // 类名
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
{"name": "address", "type": ["string", "null"]}
]
}
四、avro-tools编译 avsc 文件
下载avro-tools.jar,使用avro-tools.jar编译avsc文件,
编译到java目录下
java -jar avro-tools-1.11.0.jar compile schema src/main/avro/user.avsc src/main/java
src/main/avro/user.avsc
源目录;src/main/java
目标目录
五、使用maven插件编译 avsc 文件
<dependencies>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.11.0</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
</plugins>
</build>
六、使用
Avro的类生成不是必须的,它只需要我们提供avsc文件作为数据的格式,然后利用DatumWriter+Encoder进行数据输出、DatumReader+Decoder进行数据输入即可。不过,在数据IO的时候,如果要同时对多个字段处理,把字段封装成java类可能有利于进一步操作。
1. 序列化
@Test
public void write1() throws IOException {
User user1 = new User();
user1.setName("Tom");
user1.setAge(21);
user1.setAddress("beijing");
User user2 = new User("Jack", 22, "shanghai");
User user3 = User.newBuilder()
.setName("Harry")
.setAge(23)
.setAddress("hangzhou")
.build();
DatumWriter<User> userDatumWriter = new SpecificDatumWriter<>(User.class);
DataFileWriter<User> dataFileWriter = new DataFileWriter<>(userDatumWriter);
dataFileWriter.create(user1.getSchema(), new File("users.avro"));
dataFileWriter.append(user1);
dataFileWriter.append(user2);
dataFileWriter.append(user3);
dataFileWriter.close();
}
@Test
public void write2() throws IOException {
Schema schema = new Schema.Parser().parse(new File("./src/main/avro/user.avsc"));
GenericRecord user1 = new GenericData.Record(schema);
user1.put("name", "Tom");
user1.put("age", 21);
user1.put("address", "beijing");
GenericRecord user2 = new GenericData.Record(schema);
user2.put("name", "Jack");
user2.put("age", 22);
user2.put("address", "shanghai");
GenericRecord user3 = new GenericData.Record(schema);
user3.put("name", "Harry");
user3.put("age", 23);
user3.put("address", "hangzhou");
DatumWriter<GenericRecord> userDatumWriter = new GenericDatumWriter<>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(userDatumWriter);
dataFileWriter.create(schema, new File("users.avro"));
dataFileWriter.append(user1);
dataFileWriter.append(user2);
dataFileWriter.append(user3);
dataFileWriter.close();
}
@Test
public void write3() throws IOException {
Schema schema = SchemaBuilder.builder("com.example.avro").record("User").fields()
.name("name").type(SchemaBuilder.builder().stringType()).noDefault()
.name("age").type(SchemaBuilder.builder().intType()).noDefault()
.name("address").type(SchemaBuilder.builder().nullable().stringType()).noDefault()
.endRecord();
GenericRecord user1 = new GenericData.Record(schema);
user1.put("name", "Tom");
user1.put("age", 21);
user1.put("address", "beijing");
GenericRecord user2 = new GenericData.Record(schema);
user2.put("name", "Jack");
user2.put("age", 22);
user2.put("address", "shanghai");
GenericRecord user3 = new GenericData.Record(schema);
user3.put("name", "Harry");
user3.put("age", 23);
user3.put("address", "hangzhou");
DatumWriter<GenericRecord> userDatumWriter = new GenericDatumWriter<>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(userDatumWriter);
dataFileWriter.create(schema, new File("users.avro"));
dataFileWriter.append(user1);
dataFileWriter.append(user2);
dataFileWriter.append(user3);
dataFileWriter.close();
}
2.反序列化
@Test
public void reader1() throws IOException {
DatumReader<User> userDatumReader = new SpecificDatumReader<>(User.class);
DataFileReader<User> dataFileReader = new DataFileReader<>(new File("users.avro"), userDatumReader);
while (dataFileReader.hasNext()) {
User user = dataFileReader.next();
System.out.println(user);
}
}
@Test
public void reader2() throws IOException {
Schema schema = new Schema.Parser().parse(new File("./src/main/avro/user.avsc"));
DatumReader<GenericRecord> userDatumReader = new GenericDatumReader<>(schema);
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(new File("users.avro"), userDatumReader);
while (dataFileReader.hasNext()) {
GenericRecord user = dataFileReader.next();
System.out.println(user);
}
}
@Test
public void reader3() throws IOException {
Schema schema = SchemaBuilder.builder("com.example.avro").record("User").fields()
.name("name").type(SchemaBuilder.builder().stringType()).noDefault()
.name("age").type(SchemaBuilder.builder().intType()).noDefault()
.name("address").type(SchemaBuilder.builder().nullable().stringType()).noDefault()
.endRecord();
DatumReader<GenericRecord> userDatumReader = new GenericDatumReader<>(schema);
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(new File("users.avro"), userDatumReader);
while (dataFileReader.hasNext()) {
GenericRecord user = dataFileReader.next();
System.out.println(user);
}
}
3. 生成的 avro 文件
users.avro
Objavro.schema�{"type":"record","name":"User","namespace":"com.example.avro","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"},{"name":"address","type":["string","null"]}]} 2�n��r��D�^Tom* beijingJack, shanghai
Harry. hangzhou2�n��r��D�
可以使用 java -jar avro-tools-1.11.0.jar tojson users.avro
,将DataFileWriter序列化的文件,以json的格式进行读取
{"name":"Tom","age":21,"address":{"string":"beijing"}}
{"name":"Jack","age":22,"address":{"string":"shanghai"}}
{"name":"Harry","age":23,"address":{"string":"hangzhou"}}