rpc · 2022-06-30 0

avro 序列化与反序列化

一、概述

Apache Avro不仅仅是一个数据序列化协议,更提供一个良好定义的IPC消息协议,同时提供更高级别的RPC封装,使得基于Apache Avro的RPC调用变得更快捷。

Apache Avro 提供了三种不同的文件格式分别用来定义数据序列化协议(Schema ),接口协议(Protocol ),接口定义语言(IDL)分别以.avsc,.avpr,.avdl作为文件标识。

二、maven引入

<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.11.0</version>
</dependency>

三、编写 avsc 文件

在src/main下新建目录avro,avro目录下新建文件user.avsc

{
    "namespace": "com.example.avro", // 相当于声明一个包
    "type": "record", // 相当于Java中的class
    "name": "User", // 类名
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "age",  "type": "int"},
        {"name": "address",  "type": ["string", "null"]}
    ]
}

四、avro-tools编译 avsc 文件

下载avro-tools.jar,使用avro-tools.jar编译avsc文件,
编译到java目录下

java -jar avro-tools-1.11.0.jar compile schema src/main/avro/user.avsc src/main/java

src/main/avro/user.avsc 源目录;src/main/java目标目录

五、使用maven插件编译 avsc 文件

    <dependencies>

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.11.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.11.0</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>11</source>
                    <target>11</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

六、使用

Avro的类生成不是必须的,它只需要我们提供avsc文件作为数据的格式,然后利用DatumWriter+Encoder进行数据输出、DatumReader+Decoder进行数据输入即可。不过,在数据IO的时候,如果要同时对多个字段处理,把字段封装成java类可能有利于进一步操作。

1. 序列化

    @Test
    public void write1() throws IOException {
        User user1 = new User();
        user1.setName("Tom");
        user1.setAge(21);
        user1.setAddress("beijing");

        User user2 = new User("Jack", 22, "shanghai");

        User user3 = User.newBuilder()
                .setName("Harry")
                .setAge(23)
                .setAddress("hangzhou")
                .build();

        DatumWriter<User> userDatumWriter = new SpecificDatumWriter<>(User.class);
        DataFileWriter<User> dataFileWriter = new DataFileWriter<>(userDatumWriter);
        dataFileWriter.create(user1.getSchema(), new File("users.avro"));
        dataFileWriter.append(user1);
        dataFileWriter.append(user2);
        dataFileWriter.append(user3);
        dataFileWriter.close();
    }

    @Test
    public void write2() throws IOException {
        Schema schema = new Schema.Parser().parse(new File("./src/main/avro/user.avsc"));

        GenericRecord user1 = new GenericData.Record(schema);
        user1.put("name", "Tom");
        user1.put("age", 21);
        user1.put("address", "beijing");

        GenericRecord user2 = new GenericData.Record(schema);
        user2.put("name", "Jack");
        user2.put("age", 22);
        user2.put("address", "shanghai");

        GenericRecord user3 = new GenericData.Record(schema);
        user3.put("name", "Harry");
        user3.put("age", 23);
        user3.put("address", "hangzhou");

        DatumWriter<GenericRecord> userDatumWriter = new GenericDatumWriter<>(schema);
        DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(userDatumWriter);
        dataFileWriter.create(schema, new File("users.avro"));
        dataFileWriter.append(user1);
        dataFileWriter.append(user2);
        dataFileWriter.append(user3);
        dataFileWriter.close();
    }

    @Test
    public void write3() throws IOException {
        Schema schema = SchemaBuilder.builder("com.example.avro").record("User").fields()
                .name("name").type(SchemaBuilder.builder().stringType()).noDefault()
                .name("age").type(SchemaBuilder.builder().intType()).noDefault()
                .name("address").type(SchemaBuilder.builder().nullable().stringType()).noDefault()
                .endRecord();

        GenericRecord user1 = new GenericData.Record(schema);
        user1.put("name", "Tom");
        user1.put("age", 21);
        user1.put("address", "beijing");

        GenericRecord user2 = new GenericData.Record(schema);
        user2.put("name", "Jack");
        user2.put("age", 22);
        user2.put("address", "shanghai");

        GenericRecord user3 = new GenericData.Record(schema);
        user3.put("name", "Harry");
        user3.put("age", 23);
        user3.put("address", "hangzhou");

        DatumWriter<GenericRecord> userDatumWriter = new GenericDatumWriter<>(schema);
        DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(userDatumWriter);
        dataFileWriter.create(schema, new File("users.avro"));
        dataFileWriter.append(user1);
        dataFileWriter.append(user2);
        dataFileWriter.append(user3);
        dataFileWriter.close();
    }

2.反序列化

    @Test
    public void reader1() throws IOException {
        DatumReader<User> userDatumReader = new SpecificDatumReader<>(User.class);
        DataFileReader<User> dataFileReader = new DataFileReader<>(new File("users.avro"), userDatumReader);
        while (dataFileReader.hasNext()) {
            User user = dataFileReader.next();
            System.out.println(user);
        }
    }

    @Test
    public void reader2() throws IOException {
        Schema schema = new Schema.Parser().parse(new File("./src/main/avro/user.avsc"));
        DatumReader<GenericRecord> userDatumReader = new GenericDatumReader<>(schema);
        DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(new File("users.avro"), userDatumReader);
        while (dataFileReader.hasNext()) {
            GenericRecord user = dataFileReader.next();
            System.out.println(user);
        }
    }

    @Test
    public void reader3() throws IOException {
        Schema schema = SchemaBuilder.builder("com.example.avro").record("User").fields()
                .name("name").type(SchemaBuilder.builder().stringType()).noDefault()
                .name("age").type(SchemaBuilder.builder().intType()).noDefault()
                .name("address").type(SchemaBuilder.builder().nullable().stringType()).noDefault()
                .endRecord();
        DatumReader<GenericRecord> userDatumReader = new GenericDatumReader<>(schema);
        DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(new File("users.avro"), userDatumReader);
        while (dataFileReader.hasNext()) {
            GenericRecord user = dataFileReader.next();
            System.out.println(user);
        }
    }

3. 生成的 avro 文件

users.avro

Objavro.schema�{"type":"record","name":"User","namespace":"com.example.avro","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"},{"name":"address","type":["string","null"]}]} 2�n��r��D�^Tom* beijingJack, shanghai
Harry. hangzhou2�n��r��D�

可以使用 java -jar avro-tools-1.11.0.jar tojson users.avro ,将DataFileWriter序列化的文件,以json的格式进行读取

{"name":"Tom","age":21,"address":{"string":"beijing"}}
{"name":"Jack","age":22,"address":{"string":"shanghai"}}
{"name":"Harry","age":23,"address":{"string":"hangzhou"}}