如何使用ApacheFlink实现自定义Sink
                                            如何使用Apache Flink实现自定义Sink,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

创新互联长期为上千多家客户提供的网站建设服务,团队从业经验10年,关注不同地域、不同群体,并针对不同对象提供差异化的产品和服务;打造开放共赢平台,与合作伙伴共同营造健康的互联网生态环境。为源城企业提供专业的成都做网站、成都网站制作、成都外贸网站建设,源城网站改版等技术服务。拥有十余年丰富建站经验和众多成功案例,为您定制开发。
socket发送过来的数据,把String类型转成对象,然后把Java对象保存到MySQL数据库中。
创建数据库和表
create database imooc_flink; create table student( id int(11) NOT NULL AUTO_INCREMENT, name varchar(25), age int(10), primary key(id) )
导入mysql依赖:
mysql mysql-connector-java 8.0.15 
创建POJO Student
package com.vincent.course05;
public class Student {
    private int id;
    private String name;
    private int age;
    @Override
    public String toString() {
        return "Student{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", age=" + age +
                '}';
    }
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getAge() {
        return age;
    }
    public void setAge(int age) {
        this.age = age;
    }
}然后创建连接,SinkToMySQL.java
package com.vincent.course05; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; public class SinkToMySQL extends RichSinkFunction{ PreparedStatement ps; private Connection connection; /** * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接 * * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = getConnection(); String sql = "insert into student(id, name, age) values(?, ?, ?);"; ps = this.connection.prepareStatement(sql); } @Override public void close() throws Exception { super.close(); //关闭连接和释放资源 if (connection != null) { connection.close(); } if (ps != null) { ps.close(); } } /** * 每条数据的插入都要调用一次 invoke() 方法 * * @param value * @param context * @throws Exception */ @Override public void invoke(Student value, Context context) throws Exception { //组装数据,执行插入操作 ps.setInt(1, value.getId()); ps.setString(2, value.getName()); ps.setInt(3, value.getAge()); ps.executeUpdate(); } private static Connection getConnection() { Connection con = null; try { Class.forName("com.mysql.cj.jdbc.Driver"); con = DriverManager.getConnection("jdbc:mysql://192.168.152.45:3306/imooc_flink?useUnicode=true&characterEncoding=UTF-8", "root", "123456"); } catch (Exception e) { e.printStackTrace(); System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage()); } return con; } } 
main方法:
public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = environment.socketTextStream("192.168.152.45", 9999);
        SingleOutputStreamOperator studentStream = source.map(new MapFunction() {
            @Override
            public Student map(String value) throws Exception {
                String[] splits = value.split(",");
                Student student = new Student();
                student.setId(Integer.parseInt(splits[0]));
                student.setName(splits[1]);
                student.setAge(Integer.parseInt(splits[2]));
                return student;
            }
        });
        studentStream.addSink(new SinkToMySQL());
        environment.execute("JavaCustomSinkToMysql");
    }   从socket中获取数据,数据格式使用逗号分割,在控制台中输入:
nc -lk 9999 1,tom,23
检查数据库,数据库中多了一条数据
mysql> select * from student; +----+------+------+ | id | name | age | +----+------+------+ | 1 | tom | 23 | +----+------+------+ 1 row in set (0.00 sec)
这样就很方便的使用自定义的sink,写入到MySQL中去。
总结:
第一步:继承RichSinkFunction
T就是想要写入的对象类型 第二步:重写方法 open/close生命周期方法,invoke每条记录执行一次
默认情况下open方法的并行度不是1,跟具体的电脑有关系。
看完上述内容,你们掌握如何使用Apache Flink实现自定义Sink的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注创新互联行业资讯频道,感谢各位的阅读!
新闻标题:如何使用ApacheFlink实现自定义Sink
本文链接:http://www.scyingshan.cn/article/ishdsg.html

 建站
建站
 咨询
咨询 售后
售后
 建站咨询
建站咨询 
 