翼度科技»论坛 编程开发 mysql 查看内容

Flume如何自定义Sink数据至MySQL

6

主题

6

帖子

18

积分

新手上路

Rank: 1

积分
18
Flume自定义Sink数据至MySQL


一、介绍

Sink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent。
Sink是完全事务性的。在从Channel批量删除数据之前,每个Sink用Channel启动一个事务。批量事件一旦成功写出到存储系统或下一个Flume Agent,Sink就利用Channel提交事务。事务一旦被提交,该Channel从自己的内部缓冲区删除事件。
Sink组件目的地包括hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。官方提供的Sink类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些Sink。
官方也提供了自定义sink的接口:
  1. https://flume.apache.org/FlumeDeveloperGuide.html#sink根据官方说明自定义MySink需要继承AbstractSink类并实现Configurable接口。
复制代码
实现相应方法:

  • configure(Context context)//初始化context(读取配置文件内容)
  • process()//从Channel读取获取数据(event),这个方法将被循环调用。
使用场景:
读取Channel数据写入MySQL或者其他文件系统。

二、需求

使用flume接收(id,name,string)数据,并在Sink端给每条数据进行切分,编写JDBC驱动将数据保存到MySQL数据库。

三、编写MySink
  1. package com.flume.flume;

  2. import org.apache.flume.*;
  3. import org.apache.flume.conf.Configurable;
  4. import org.apache.flume.sink.AbstractSink;

  5. import java.sql.Connection;
  6. import java.sql.DriverManager;
  7. import java.sql.PreparedStatement;
  8. import java.sql.SQLException;

  9. public class MySink extends AbstractSink implements Configurable {

  10.     private String msgPrefix;

  11.     /**
  12.      * 用来保存数据,不断调用次方法
  13.      * @return
  14.      * @throws EventDeliveryException
  15.      */
  16.     @Override
  17.     public Status process() throws EventDeliveryException {
  18.         //获取sink对应的channnel
  19.         Channel channel = getChannel();
  20.         Connection connection = null;
  21.         PreparedStatement statement = null;
  22.         //获取事务对象
  23.         Transaction transaction = channel.getTransaction();
  24.         try{
  25.             //开启事务
  26.             transaction.begin();
  27.             //从channel中获取数据
  28.             Event event = channel.take();

  29.             //切割数据
  30.             String data = new String(event.getBody());
  31.             String[] arr = data.split(",");
  32.             String id = arr[0];
  33.             String name = arr[1];
  34.             int age = Integer.parseInt(arr[2]);

  35.             //保存到mysql
  36.             //1、获取connect
  37.             connection = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/test?useSSL=false","root","123321");
  38.             statement = connection.prepareStatement("insert into test values(?,?,?)");
  39.             saveToMysql(id,name,age,connection,statement);
  40.             //模拟数据保存
  41.             //System.out.println(msgPrefix+":"+new String(take.getBody()));
  42.             //提交事务
  43.             transaction.commit();

  44.             return Status.READY;
  45.         }catch (Exception e){
  46.             transaction.rollback();
  47.         }finally {
  48.             //关闭事务
  49.             transaction.close();
  50.             if(statement!=null)
  51.             //5、关闭
  52.             {
  53.                 try {
  54.                     statement.close();
  55.                 } catch (SQLException e) {
  56.                     e.printStackTrace();
  57.                 }
  58.             }
  59.             if(connection!=null) {
  60.                 try {
  61.                     connection.close();
  62.                 } catch (SQLException e) {
  63.                     e.printStackTrace();
  64.                 }
  65.             }
  66.         }

  67.         return Status.BACKOFF;
  68.     }

  69.     public void saveToMysql(String id,String name,int age,Connection connection,PreparedStatement statement) throws SQLException {

  70.         //2、获取statement对象
  71.         //sql注入 【 select * from table where name='zhangsan' or 1=1】
  72.         //connection.createStatement();

  73.         //3、赋值
  74.         statement.setString(1,id);
  75.         statement.setString(2,name);
  76.         statement.setInt(3,age);
  77.         System.out.println(id+","+name+","+age);
  78.         //4、保存
  79.         statement.executeUpdate();


  80.     }
  81.     /**
  82.      * 获取sink的配置属性
  83.      * @param context
  84.      */
  85.     @Override
  86.     public void configure(Context context) {

  87.         msgPrefix = context.getString("msg.prefix");

  88.     }
  89. }
复制代码
四、编写Flume脚本
  1. #定义agent
  2. a1.sources = r1
  3. a1.channels = c1
  4. a1.sinks = k1

  5. #定义source
  6. a1.sources.r1.type = netcat
  7. a1.sources.r1.bind = 0.0.0.0
  8. a1.sources.r1.port = 9999

  9. #定义channel
  10. a1.channels.c1.type = memory
  11. a1.channels.c1.capacity = 1000
  12. a1.channels.c1.transactionCapacity = 1000

  13. #定义sink
  14. a1.sinks.k1.type = com.atguigu.flume.MySink
  15. a1.sinks.k1.msg.prefix = message

  16. #定义source、channel、sink之间的绑定关系
  17. a1.sources.r1.channels = c1
  18. a1.sinks.k1.channel = c1
复制代码
五、测试

1.启动flume
  1. [hadoop@hadoop102 ~]$ cd /opt/module/flume/
  2. [hadoop@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/mysik.config -Dflume.root.logger=INFO,console
复制代码
2.启动nc端口
  1. [hadoop@hadoop102 ~]$ nc hadoop102 9999
  2. 1,ttt,8
  3. OK
复制代码
3.客户端输出

4.查看MySQL数据库


总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

来源:https://www.jb51.net/database/328292d4v.htm
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

举报 回复 使用道具