1 | 点我达大数据团队初创于2015年,随着公司业务发展,大数据对于公司的业务发展发挥了越来越大的作用,目前服务的用户/团队包含BI、产品运营、运力中心以及技术内部应用的数据服务等 |
目前大数据主要的结构如下:
从最下面一层往上依次为:
一、接入层
1、DataX
a) dataX 是一个 ETL 工具,阿里出品
b) 采用 Framework + plugin 架构构建,幸运的是自带了常用的插件,比如 MysqlReader、HdfsWriter 等
c) Standalone,无中心,每个实例之间无关联
d) 性能强劲、相对于 sqoop,配置更简单
e) 稳定高效,我们引入 DataX 以来,从来没有在数据传输上出过问题
2、DTS/Canal
a) Canal 是基于 Mysql binlog 变动,实时同步数据变化的工具
b) DTS 是阿里云提供的一套数据产品,原理和 Canal 类似,我们想监听某个表的变动,只需订阅,就可以获取到数据的实时变化
c) 迁云之后,主要使用 DTS
d) 数据落地我们使用 kafka,topic 一般是 schema.table_name
e) 消费方一般是 streaming、airflow 调度的 spark 作业等,计算后的数据二次落地在 redis、hbase、es 等数据集群中
3、Logstash
a) 因为配置相对于 flume 简单,并且和 elasticsearch、kibana 紧密结合,我们的应用日志大部分使用 logstash 进行收集
b) logstash 将日志落地到 kafka,我们的计算引擎进行计算
4、Flume
a) flume 主要是采集了移动端的 track log,用于计算骑手的轨迹
b) app 和 flume 之间我们搭建了一个 nodejs 写的 web 应用,示意如下:
二、存储层
存储层包含 hdfs、kafka、hbase、redis 和 elasticsearch 等,其中 redis、hbase、es 作为数据二次落地(最终计算结果)的存储介质。**
1、redis
首先如果数据结构符合 kv 结构,首选 redis,因为 redis 非常快,毫秒级别,哪怕 mget 一万个 key。
2、hbase
其次对于需要多个查询条件进行检索的数据,我们一般放到 hbase 中,应用通过 phoenix 引擎读取 hbase,hbase 如果 rowkey 设计合理,几十毫秒的 rt 是可以满足的,但是如果设计不合理,那是非常缓慢的,设计 rowkey 时,可以把能够过滤最多记录的 key 放在 rowkey 最前面,依次类推。
3、elasticsearch
** 对于写少读多的数据,而且检索条件非常丰富,我们放到 elasticsearch 中,比如历史订单查询。
三、计算层
1、Hive(MR)
a) hive 主要用于离线数据分析
b) hql 能实现大部分场景的数据统计,复杂处理逻辑使用 hive udf,hql 中调用 udf 函数就可以了
c) 分区方案采用按日分区的,部分表使用日期 + 小时 + 分钟的分区方案
2、Spark
a) spark 我们主要使用 spark sql 和 spark streaming(当然包含 spark core),前者主要处理批处理和交互式查询,后者用于实时计算,spark mllib 暂时没用到
b) 大部分实时处理使用 spark streaming,吞吐量高,并不需要纯实时和高可靠地事务机制,
c) 为了便于管理,和 hive mr 一样,统一使用 yarn 调度,后续可能考虑部分小内存的 spark job 使用 standalone 调度
3、Storm
a) storm 用得不多
b) 主要是云上的 dts 任务和云下仪表盘的 influxdb writer
四、调度层
1、Yarn
Yarn(MR2) 上面跑了两种作业:hive 和 spark 作业
a) hive(mr) job
hive 作业一般是 airflow 调度的每日例行作业,或者是数据分析人员(技术、bi、运力、运营等部门)通过 hive cli、hue 等工具提交的离线作业,运行在 root.default 队列上。可以自定义 Map/Reduce Container 的资源大小,如在 hive 中分别指定:set mapreduce.map.memory.mb=4096; set mapreduce.reduce.memory.mb=4096;
b) spark on yarn
在 yarn 上运行的 spark 作业分为 streaming 作业和 spark 批处理作业,分别运行在 root.spark 和 root.airflow 队列上。可指定 driver、executor 占用资源:$SPARK_HOME/bin/spark-submit --master yarn --deploy-mode cluster --queue spark --executor-cores 1 --executor-memory 2g --num-executors 1 --driver-memory 1g --driver-cores 1
2、Airflow
a) 我们的 daily 任务、5/15/30min 任务都靠 airflow 调度
b) 一个支持 dag(有向无环图)的任务调度工具
c) 一个任务失败了,它的所有 downstream 任务都是失败的,都不会运行,保证了数据的正确性
d) python 编写,配置比 oozie 简单
五、OLAP 层
1、hive 数据仓库
hive 数据仓库是任何大数据平台中基础的基础,任何业务数据都按照某种分区规则存放在此,是各种计算引擎的数据基础。我们在设计 hive 仓库时,根据 hive 表的作用范围划分为几个命名空间,分别是:
a) ODS
ODS(Operational Data Store) 代表记录层面,在我们这里代表 etl 拉过来的基础数据,通俗地说,就是 datax 拉的数据表
b) DW
DW 是根据业务分析需求,在 ods 的基础上进行最基础的聚合,是所有统计分析数据的基础
c) DWS/Report
DWS/Report 是 DW 的基础上进行聚合,面向某一统计主题的、对调用者而言无需再进行 join 的
2、Presto
Presto 集群可以拉取 hive 表,到内存中计算,比 hive 快很多,支持 Ad-hoc,没有引入 Presto 之前,cube 的数据源是 hive 计算后通过 datax 将结果导入到 mysql。
但是随着业务的发展,一个报表需求的查询维度是任意维度的,也就是通常所说的 ad-hoc,这样一来,我们要为每一个查询维度生成一张 hive 表,然后 hive 表导入到 mysql,这样既给数仓层的设计造成了不便,同时 mysql 也保存了所有维度的大量数据,mysql 表只保存最终统计结果的目标也就失去了意义,引入 presto 后,解决了我们这些棘手的问题。
3、Cube
Cube 是一个数据呈现平台,主要用于公司业务人员和分析部门的数据需求,所有数据从 Presto 中读取,目前分为 PC 端和移动端,经过产品上线以来的运营情况,移动端的数据需求占比越来越高。
4、数据大屏
数据大屏用来展示点我达规模、质量等服务运营指标,为公司内部和外部相关人群提供运营指标参考。数据大屏采用阿里云的 datav 数据可视化产品搭建,非本地部署方式,我们采购了 4 个显示大屏用于展示我们创建的 4 个 datav 屏幕,具体工作方式为:
a) 登录 datav 控制台,创建 “可视化屏幕”
b) 设置屏幕的数据来源 url、展示数据的格式和布局
c) 发布屏幕
d) 将配置完成的 4 张屏幕投放到大屏显示器中
5、hive 元数据管理 warden
对于 bi 分析人员,如果没有一个便捷的工具,在庞大的 hive 数据仓库中找到想要的指标是意见很困难的事情,为此,我们开发了一套 hive 元数据查询系统,比如,我关心订单妥投情况,我直接在全文检索搜索框中输入 “妥投”,就会找到有哪些 hive 表包含了这个信息,截图如下:
六、OLTP 层
用于内部系统的数据服务调用,使用 dubbo rpc 提供服务,每一个 dubbo 服务,后面都有一个或者多个 streaming 任务进行计算 数据服务简单列举几个:
POI 服务:经纬度相关兴趣点服务
商家交易:商家通过 app 可以查看交易、账户等
骑手交易:骑手通过 app 查看交易、账户等
fft、fat:预测订单完成时间、预测骑手到店时间
天气服务:天气影响骑手考核和补贴
骑手轨迹:后台 crm 可以查看骑手的轨迹和到店、完成时点情况
历史订单查询:我们的业务库一般保留当天数据,ES 提供历史订单的全文检索功能
订单压力(热力图):根据传递的经纬度,获取所覆盖的所有小方格的 geohash 及其订单压力情况,后台 streaming 算好了每个 geohash 的压力值,存储于 redis 中
七、高可靠性保证
对于大数据团队内部产品而言,OLAP 应用一般是离线的或者使用者是内部用户,容许一定的延时,但 OLTP 服务一般处于整个 OLTP 系统的调用链中,需要高可靠性服务保证,我们从服务降级和监控报警处理两个层面实现服务的可靠性。
1、服务降级
比如压力服务,当我们的 streaming 任务挂掉或者处理缓慢怎么办,这时候我们起了一个每 5 分钟执行一次的备份 job,当 dubbo 服务执行时,它首先检查 streaming 任务的最后写入 redis key 的时间,当此时间与当前时间差距超过我们设定的阈值时,就判定 streaming 任务异常,这时候就获取备份 job 的数据,极大地保证了压力服务的可靠性,示意图如下:
2、监控报警体系
a) 基础平台告警(仪表盘)
仪表盘是点我达通用的一套监控报警系统,可以设定阈值,如果某个监控的标的超过阈值,可以邮件、短信息实时告警,可以在第一时间处理。比如我们的 dubbo 服务,最起码的一个监控指标是执行时长,我们做了一个注解 “@Monitor” 和处理此注解的切面程序,如何方法只要加入了这个注解,会把这个方法的执行时长加入到监控系统中,在仪表盘监控页面中就可以看到这个方法的监控项(当然需要自己添加 graph 和 influxdb mesurement) 对于另外个性化指标的监控,则每个服务自己去控制。
b) 大数据报警体系
大数据自身的报警体系如下图所示:
八、踩坑记录
大数据团队发展至今,遇到过很多问题,有大有小,不一而足,以下列举几个:
1、phoenix 版本不匹配
查询和建表使用的 phoenix 版本不一致导致的数据重复的问题。phoenix 表是使用 4.9 建的,dubbo 里面用的是 phoenix4.7,读取的时候发现读取到的数据有重复,后来升级 dubbo 服务的 phoenix 版本到 4.9,解决了此问题
2、spark2.1.1 的 bug
代码示例:
val df = spark.sql( """select user_id, timestamp, order_id, user_name, lng, lat, status, leave_tm from ( select cast(user_id as bigint) user_id, cast(timestamp as bigint) timestamp, cast(coalesce(get_json_object(args,'$.orderId'),'0') as int) order_id, user_name, cast(longitude as int) lng, cast(latitude as int) lat, get_json_object(args,'$.status') status, cast(get_json_object(args,'$.leaveTm') as bigint) leave_tm from logs where event='rider_trace') t where user_id is not null and timestamp is not null and order_id is not null and status is not null """)
这段代码是从 logs 数据源中读取和解析数据,其中 getjson_object_ 一直报错,提示java.io.CharConversionException: Invalid UTF-32 character 0x7b2265(above 10ffff) at char #192, byte #771) at com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:189) at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:150) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.loadMore(ReaderBasedJsonParser.java:153) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:1855) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:571) at org.apache.spark.sql.catalyst.expressions.GetJsonObject$$anonfun$eval$2$$anonfun$4.apply(jsonExpressions.scala:142)
提示是说某些非法字符,因为我们没有捕获异常,报错了程序也不退出,反复处理这一条脏数据,后来临时改一下代码,把 get**jsonobject** 去掉,绕过了这一条数据,再把程序改回来,后来在网上搜索了一下,发现是 spark2.1.1 的 bug,已经在 spark2.2.0 之后的版本修复了。
3、yarn 资源设置不当引起离线 dag 处理失败
为了提高作业的并发量,我们减少了单个 map/reduce container 的内存大小,结果某个订单的大作业无足够资源,运行失败,导致当日离线报表无数据。其实也是一个权衡,过大浪费资源、并发作业少;过小则大作业无法运行,需要我们对比较耗费资源的作业做到心里有数,对大作业单独指定资源
九、集群部署结构
附录点我达大数据服务集群部署结构如图示