首页 技术 正文
技术 2022年11月13日
0 收藏 431 点赞 5,111 浏览 5179 个字

1、继承 RichSinkFunction 类

  mvn配置:

        <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hbase_2.12</artifactId>
<version>1.7.2</version>
</dependency>
  <dependency>    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
<exclusions>
<exclusion>
<artifactId>xml-apis</artifactId>
<groupId>xml-apis</groupId>
</exclusion>
</exclusions>
</dependency>

  config配置:

  flink连接hbase方法及遇到的问题

  flink连接hbase方法及遇到的问题

  flink接入config代码:

  

    public static void main(String[] args) throws Exception {
/*
Env and Config
*/
if (args.length > 0) {
configEnv = args[0];
} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String confName = String.format("xxx.%s.properties", configEnv);
InputStream in = MidasCtr.class.getClassLoader().getResourceAsStream(confName); ParameterTool parameterTool = ParameterTool.fromPropertiesFile(in);
env.getConfig().setGlobalJobParameters(parameterTool);
}

  

  代码:  

package midas.knowbox;import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;public class WriteHbaseRich extends RichSinkFunction<AdDot> {
private Connection conn = null;
private Table table = null; private static String zkServer;
private static String zkPort;
private static TableName tableName; private static final String click = "click";
BufferedMutatorParams params;
BufferedMutator mutator; @Override
public void open(Configuration parameters) throws Exception {
ParameterTool para = (ParameterTool)
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
zkServer = para.getRequired("hbase.zkServer");
zkPort = para.getRequired("hbase.zkPort");
String tName = para.getRequired("hbase.tableName");
tableName = TableName.valueOf(tName); org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create(); config.set("hbase.zookeeper.quorum", zkServer);
config.set("hbase.zookeeper.property.clientPort", zkPort); conn = ConnectionFactory.createConnection(config);
Admin admin = conn.getAdmin();
admin.listTableNames();
if (!admin.tableExists(tableName)) {
HTableDescriptor tableDes = new HTableDescriptor(tableName); tableDes.addFamily(new HColumnDescriptor(click).setMaxVersions(3)); System.out.println("create table");
admin.flush(tableName);
}
// 连接表
table = conn.getTable(tableName); // 设置缓存
params = new BufferedMutatorParams(tableName);
params.writeBufferSize(1024);
mutator = conn.getBufferedMutator(params);
} @Override
public void invoke(AdDot record, Context context) throws Exception {
Put put = new Put(Bytes.toBytes(String.valueOf(record.userID)));
System.out.println("hbase write"); System.out.println(record.recent10Data);
put.addColumn(Bytes.toBytes(click),Bytes.toBytes("recent_click"),Bytes.toBytes(String.valueOf(record.toJson()))); mutator.mutate(put);
System.out.println("hbase write");
} @Override
public void close() throws Exception {
mutator.flush();
conn.close();
}
}

  调用:

dataStream.addSink(new WriteHbaseRich());

2、实现接口OutputFormat(不知道如何使用flink的配置文件)

  

package midas.knowbox;import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.configuration.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException;
import java.util.ArrayList;public class WriteHbase implements OutputFormat<AdDot> { private Connection conn = null;
private Table table = null; private static String zkServer = "";
private static String port = "2181";
private static TableName tableName = TableName.valueOf("test"); private static final String userCf = "user";
private static final String adCf = "ad"; @Override
public void configure(Configuration parameters) {
} @Override
public void open(int taskNumber, int numTasks) throws IOException {
org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create(); config.set("hbase.zookeeper.quorum", zkServer);
config.set("hbase.zookeeper.property.clientPort", port); conn = ConnectionFactory.createConnection(config);
Admin admin = conn.getAdmin();
admin.listTableNames();
if (!admin.tableExists(tableName)) { // 添加表描述
HTableDescriptor tableDes = new HTableDescriptor(tableName); // 添加列族
tableDes.addFamily(new HColumnDescriptor(userCf));
tableDes.addFamily(new HColumnDescriptor(adCf)); // 创建表
admin.createTable(tableDes);
}
table = conn.getTable(tableName);
} @Override
public void writeRecord(AdDot record) throws IOException {
Put put = new Put(Bytes.toBytes(record.userID + "_" + record.adID + "_" + record.actionTime)); // 指定行
// 参数分别:列族、列、值
put.addColumn(Bytes.toBytes("user"), Bytes.toBytes("uerid"), Bytes.toBytes(record.userID));
put.addColumn(Bytes.toBytes("ad"), Bytes.toBytes("ad_id"), Bytes.toBytes(record.adID)); table.put(put);
} @Override
public void close() throws IOException {
conn.close()
}
}

3、遇到的问题

  写入hbase的时候出现包引用错误 剔除 xml-apis 就好了

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
<exclusions>
<exclusion>
<artifactId>xml-apis</artifactId>
<groupId>xml-apis</groupId>
</exclusion>
</exclusions>

</dependency>

  flink连接hbase方法及遇到的问题

相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:9,492
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,907
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,740
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,494
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:8,132
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:5,295