首页 技术 正文
技术 2022年11月12日
0 收藏 308 点赞 4,033 浏览 11259 个字

最近一段时间,负责公司的产品日志埋点与收集工作,搭建了基于Flume+HDFS+Hive日志搜集系统。

一、日志搜集系统架构:

简单画了一下日志搜集系统的架构图,可以看出,flume承担了agent与collector角色,HDFS承担了数据持久化存储的角色。

作者搭建的服务器是个demo版,只用到了一个flume_collector,数据只存储在HDFS。当然高可用的日志搜集处理系统架构是需要多台flume collector做负载均衡与容错处理的。

Flume + HDFS + Hive日志收集系统

二、日志产生:

1、log4j配置,每隔1分钟roll一个文件,如果1分钟之内文件大于5M,则再生成一个文件。

<!-- 产品数据分析日志 按分钟分 -->
<RollingRandomAccessFile name="RollingFile_product_minute"
fileName="${STAT_LOG_HOME}/${SERVER_NAME}_product.log"
filePattern="${STAT_LOG_HOME}/${SERVER_NAME}_product.log.%d{yyyy-MM-dd-HH-mm}-%i">
<PatternLayout charset="UTF-8"
pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %level - %msg%xEx%n" />
<Policies>
<TimeBasedTriggeringPolicy interval="1"
modulate="true" />
<SizeBasedTriggeringPolicy size="${EVERY_FILE_SIZE}" />
</Policies>
<Filters>
<ThresholdFilter level="INFO" onMatch="ACCEPT"
onMismatch="NEUTRAL" />
</Filters>
</RollingRandomAccessFile>

roll后的文件格式如下

Flume + HDFS + Hive日志收集系统

2、日志内容

json格式文件,最外层json按顺序为:tableName,logRequest,timestamp,statBody,logResponse,resultCode,resultMsg

2016-11-30 09:18:21.916 INFO - {    "tableName": "ReportView",    "logRequest": {         ***    },    "timestamp": 1480468701432,    "statBody": {        ***    },    "logResponse": {        ***    },    "resultCode": 1,    "resultFailMsg": ""}

三、flume配置

虚拟机环境,请见我的博客http://www.cnblogs.com/xckk/p/6000881.html

hadoop环境,请见我的另一篇博客http://www.cnblogs.com/xckk/p/6124553.html

此处flume环境是

centos1:flume-agent

centos2:flume-collector

1、flume agent配置,conf文件

a1.sources = skydataSourcea1.channels = skydataChannela1.sinks = skydataSinka1.sources.skydataSource.type = spooldira1.sources.skydataSource.channels = skydataChannel#日志目录a1.sources.skydataSource.spoolDir = /opt/flumeSpoola1.sources.skydataSource.fileHeader = true#日志内容处理完后,会生成.COMPLETED后缀的文件,同时.log文件每一分钟roll一个,此处忽略.log文件与.COMPLETED文件a1.sources.skydataSource.ignorePattern=([^_]+)|(.*(\.log)$)|(.*(\.COMPLETED)$)a1.sources.skydataSource.basenameHeader=truea1.sources.skydataSource.deserializer.maxLineLength=#自定义拦截器,对json格式的源日志进行字段分隔,并添加timestamp,为后面的hdfsSink做处理,拦截器代码见后面a1.sources.skydataSource.interceptors=i1a1.sources.skydataSource.interceptors.i1.type=com.skydata.flume_interceptor.HiveLogInterceptor2$Buildera1.sinks.skydataSink.type = avroa1.sinks.skydataSink.channel = skydataChannela1.sinks.skydataSink.hostname = centos2a1.sinks.skydataSink.port = #此处配置deflate压缩后,hive collector那边一定也要相应配置解压缩a1.sinks.skydataSink.compression-type=deflatea1.channels.skydataChannel.type=memorya1.channels.skydataChannel.capacity=a1.channels.skydataChannel.transactionCapacity=

2、flume collector配置

a1.sources = avroSourcea1.channels = memChannela1.sinks = hdfsSinka1.sources.avroSource.type = avroa1.sources.avroSource.channels = memChannela1.sources.avroSource.bind=centos2a1.sources.avroSource.port=#与flume agent配置对应a1.sources.avroSource.compression-type=deflatea1.sinks.hdfsSink.type = hdfsa1.sinks.hdfsSink.channel = memChannel# skydata_hive_log为hive表,按年-月-日分区存储,a1.sinks.hdfsSink.hdfs.path=hdfs://centos1:9000/flume/skydata_hive_log/dt=%Y-%m-%da1.sinks.hdfsSink.hdfs.batchSize=a1.sinks.hdfsSink.hdfs.fileType=DataStreama1.sinks.hdfsSink.hdfs.writeFormat=Texta1.sinks.hdfsSink.hdfs.rollSize=a1.sinks.hdfsSink.hdfs.rollCount=a1.sinks.hdfsSink.hdfs.rollInterval=a1.channels.memChannel.type=memorya1.channels.memChannel.capacity=a1.channels.memChannel.transactionCapacity=

四、hive表创建与分区

1、hive表创建

在hive中执行建表语句后,hdfs://centos1:9000/flume/目录下新生成了skydata_hive_log目录。(建表语句里面有location关键字)

\u0001表示hive通过该分隔符进行字段分离,该字符在linux用vim编辑器打开是^A。

由于日志格式是JSON格式,因为需要将JSON格式转换成\u0001字符分隔,并通过dt进行分区。这一步通过flume自定义拦截器来完成。

CREATE TABLE `skydata_hive_log`(`tableNmae` string,`logRequest` string,`timestamp` bigint,`statBody` string,`logResponse` string,`resultCode` int,`resultFailMsg` string)PARTITIONED BY (`dt` string)ROW FORMAT DELIMITEDFIELDS TERMINATED BY '\u0001'STORED AS INPUTFORMAT'org.apache.hadoop.mapred.TextInputFormat'OUTPUTFORMAT'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'LOCATION'hdfs://centos1:9000/flume/skydata_hive_log';

2、hive表分区

日志flume sink到hdfs上时,如果没有对hive表预先进行分区,会出现日志已经上传到hdfs目录,但是hive表却无法加载数据的情况。这是因为hive表的分区没有创建。因此要对表进行分区添加,这里对最近一年左右时间进行分区添加分区脚本 init_flume_hive_table.sh

for ((i=-;i<=;i++))
do dt=$(date -d "$(date +%F) ${i} days" +%Y-%m-%d) echo date=$dt hive -e "ALTER TABLE skydata_hive_log ADD PARTITION(dt='${dt}')" >> logs/init_skydata_hive_log.out >>logs/init_skydata_hive_log.errdone

五、自定义flume拦截器

新建maven工程,拦截器HiveInterceptor2代码如下。

package com.skydata.flume_interceptor;import java.util.ArrayList;import java.util.List;import java.util.Map;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.interceptor.Interceptor;import org.apache.flume.interceptor.TimestampInterceptor.Constants;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.alibaba.fastjson.JSONObject;import com.google.common.base.Charsets;import com.google.common.base.Joiner;public class HiveLogInterceptor2 implements Interceptor{    private static Logger logger = LoggerFactory.getLogger(HiveLogInterceptor2.class);    public static final String HIVE_SEPARATOR = "\001";    public void close()    {        // TODO Auto-generated method stub    }    public void initialize()    {        // TODO Auto-generated method stub    }    public Event intercept(Event event)    {        String orginalLog = new String(event.getBody(), Charsets.UTF_8);        try        {            String log = this.parseLog(orginalLog);            // 设置时间,用于hdfsSink            long now = System.currentTimeMillis();            Map headers = event.getHeaders();            headers.put(Constants.TIMESTAMP, Long.toString(now));            event.setBody(log.getBytes());        } catch (Throwable throwable)        {            logger.error(("errror when intercept,log [ " + orginalLog + " ] "), throwable);            return null;        }        return event;    }    public List<Event> intercept(List<Event> list)    {        List<Event> events = new ArrayList<Event>();        for (Event event : list)        {            Event interceptedEvent = this.intercept(event);            if (interceptedEvent != null)            {                events.add(interceptedEvent);            }        }        return events;    }    private static String parseLog(String log)    {        List<String> logFileds = new ArrayList<String>();        String dt = log.substring(0, 10);        String keyStr = "INFO - ";        int index = log.indexOf(keyStr);        String content = "";        if (index != -1)        {            content = log.substring(index + keyStr.length(), log.length());        }        //针对不同OS,使用不同回车换行符号        content = content.replaceAll("\r", "");        content = content.replaceAll("\n", "\\\\" + System.getProperty("line.separator"));        JSONObject jsonObj = JSONObject.parseObject(content);        String tableName = jsonObj.getString("tableName");        String logRequest = jsonObj.getString("logRequest");        String timestamp = jsonObj.getString("timestamp");        String statBody = jsonObj.getString("statBody");        String logResponse = jsonObj.getString("logResponse");        String resultCode = jsonObj.getString("resultCode");        String resultFailMsg = jsonObj.getString("resultFailMsg");        //字段分离        logFileds.add(tableName);        logFileds.add(logRequest);        logFileds.add(timestamp);        logFileds.add(statBody);        logFileds.add(logResponse);        logFileds.add(resultCode);        logFileds.add(resultFailMsg);        logFileds.add(dt);        return Joiner.on(HIVE_SEPARATOR).join(logFileds);    }    public static class Builder implements Interceptor.Builder    {        public Interceptor build()        {            return new HiveLogInterceptor2();        }        public void configure(Context arg0)        {        }    }}

pom.xml增加如下配置,将flume拦截器工程进行maven打包,jar包与依赖包均拷到${flume-agent}/lib目录

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<configuration>
<outputDirectory>
${project.build.directory}
</outputDirectory>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>prepare-package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
<overWriteReleases>true</overWriteReleases>
<overWriteSnapshots>true</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

对日志用分隔符”\001″进行分隔,。经拦截器处理后的日志格式如下,^A即是”\001″

ReportView^A{"request":{},"requestBody":{"detailInfos":[],"flag":"","reportId":7092,"pageSize":0,"searchs":[],"orders":[],"pageNum":1}}^A1480468701432^A{"sourceId":22745,"reportId":7092,"projectId":29355,"userId":2532}^A{"responseBody":{"statusCodeValue":200,"httpHeaders":{},"body":{"msg":"请求成功","httpCode":200,"timestamp":1480468701849},"statusCode":"OK"},"response":{}}^A1^A^A2016-11-30

至此,flume+Hdfs+Hive的配置均已完成。

后续可以通过mapreduce或者HQL对数据进行分析。

六、启动运行与结果

1、启动hadoop hdfs

参考我的前一篇文章:hadoop 1.2 集群搭建与环境配置  http://www.cnblogs.com/xckk/p/6124553.html

2、启动flume_collector和flume_agent,由于flume启动命令参数太多,自己写了一个启动脚本

start-Flume.sh

#!/bin/bash
jps -l|grep org.apache.flume.node.Application|awk '{print $1}'|xargs kill - >& >/dev/null
cd "$(dirname "$")"
cd ..
nohup bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name a1 >& > /dev/null &

3、hdfs查看数据

可以看到搜集的日志已经上传到HDFS上

[root@centos1 bin]# rm -rf FlumeData..tmp
[root@centos1 bin]# hadoop fs -ls /flume/skydata_hive_log/dt=--/
Found items
-rw-r--r-- root supergroup -- : /flume/skydata_hive_log/dt=--/FlumeData..tmp
-rw-r--r-- root supergroup -- : /flume/skydata_hive_log/dt=--/FlumeData.
-rw-r--r-- root supergroup -- : /flume/skydata_hive_log/dt=--/FlumeData.
[root@centos1 bin]#

4、启动hive,查看数据,可以看到hive已经可以加载hdfs数据

[root@centos1 lib]# hiveLogging initialized using configuration in file:/root/apache-hive-1.2.-bin/conf/hive-log4j.properties
hive> select * from skydata_hive_log limit ;
OK
ReportView {"request":{},"requestBody":{"detailInfos":[],"flag":"","reportId":,"pageSize":,"searchs":[],"orders":[],"pageNum":}} {"sourceId":,"reportId":,"projectId":,"userId":} {"responseBody":{"statusCodeValue":,"httpHeaders":{},"body":{"msg":"请求成功","httpCode":,"timestamp":},"statusCode":"OK"},"response":{}} --
ReportDesignResult {"request":{},"requestBody":{"sourceId":,"detailInfos":[{"colName":"月份","flag":"","reportId":,"colCode":"col_2_22745","pageSize":,"type":"","pageNum":,"rcolCode":"col_25538","colType":"string","formula":"","id":,"position":"row","colId":,"dorder":,"pColName":"月份","pRcolCode":"col_25538"},{"colName":"综合利率(合计)","flag":"","reportId":,"colCode":"col_11_22745","pageSize":,"type":"","pageNum":,"rcolCode":"sum_col_25539","colType":"number","formula":"sum","id":,"position":"group","colId":,"dorder":,"pColName":"综合利率","pRcolCode":"col_25539"}],"flag":"bar1","reportId":,"reportName":"iiiissszzzV","pageSize":,"searchs":[],"orders":[],"pageNum":,"projectId":}} {"reportType":"bar1","sourceId":,"reportId":,"num":,"usedFields":"月份$$综合利率(合计)$$","projectId":,"userId":} {"responseBody":{"statusCodeValue":,"httpHeaders":{},"body":{"msg":"请求成功","reportId":,"httpCode":,"timestamp":},"statusCode":"OK"},"response":{}} --
Time taken: 2.212 seconds, Fetched: row(s)
hive>

七、常见问题与处理方法

1、FATAL: Spool Directory source skydataSource: { spoolDir: /opt/flumeSpool }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.

java.nio.charset.MalformedInputException: Input length = 1

可能原因:

1、字符编码问题,spoolDir目录下的日志文件必须是UTF-8

2、使用Spooling Directory Source的时候,一定要避免同时读写一个文件的情况,conf文件增加如下配置

a1.sources.skydataSource.ignorePattern=([^_]+)|(.*(\.log)$)|(.*(\.COMPLETED)$)

2、日志导入到hadoop目录,但是hive表查询无数据。如hdfs://centos1:9000/flume/skydata_hive_log/dt=2016-12-01/下面有数据,

hive查询 select * from skydata_hive_log 却无数据

可能原因:

1、建表的时候,没有建立分区。即使flume进行了配置(a1.sinks.hdfsSink.hdfs.path=hdfs://centos1:9000/flume/skydata_hive_log/dt=%Y-%m-%d),但是表的分区结构没有建立,因此文件导入到HDFS上后,HIVE并不能读取。

解决方法:先创建分区,建立shell可执行文件,将该表的分区先建好

for ((i=-;i<=;i++))
do dt=$(date -d "$(date +%F) ${i} days" +%Y-%m-%d) echo date=$dt hive -e "ALTER TABLE skydata_hive_log ADD PARTITION(dt='${dt}')" >> logs/init_skydata_hive_log.out >>logs/init_skydata_hive_log.errdone

2、也可能是文件在hdfs上还是.tmp文件,仍然被hdfs在写入。.tmp文件hive暂时无法读取,只能读取非.tmp文件。

解决方法:等待hdfs配置的roll间隔时间,或者达到一定大小后tmp文件重命名为hdfs上的日志文件后,再查询hive,即可查到。

秀才坤坤出品

转载请注明

原文地址:http://www.cnblogs.com/xckk/p/6125838.html

相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:9,492
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,907
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,740
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,495
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:8,132
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:5,297