3. 数据处理和表优化

3. 数据处理和表优化

3.1 数据导入

beeline -n hive -p进入hql命令行

创建一个临时表,并加载csv数据文件加载到其中。

CREATE TEMPORARY TABLE temp_user_behavior (
`user_id` string comment 'user ID',
`item_id` string comment 'item ID',
`category_id` string comment 'category ID',
`behavior_type` string  comment 'behavior type among pv, buy, cart, fav',
`timestamp` int comment 'timestamp')
row format delimited
fields terminated by ','
lines terminated by '\n'
STORED AS TEXTFILE;

LOAD DATA LOCAL INPATH '/root/Hive/UserBehavior.csv' OVERWRITE INTO TABLE temp_user_behavior ;

创建用户行为表1。

drop table if exists user_behavior1;
create table user_behavior1 (
`user_id` string comment 'user ID',
`item_id` string comment 'item ID',
`category_id` string comment 'category ID',
`timestamp` timestamp comment 'timestamp'
)
PARTITIONED BY (`date` date, `behavior_type` string comment 'behavior type among pv, buy, cart, fav')
row format delimited
fields terminated by ','
lines terminated by '\n'
STORED AS ORC
TBLPROPERTIES ("orc.compress"="SNAPPY");

这里使用以列优先的存储格式,定义压缩算法为snappy,对于像电商分析这样主要查询列的项目,会提高很多效率。同时对日期date进行分区,以及用户行为behavior_type进行分区是一种合理的分区方法,在后续分析过程中将大大提高查询速度。

将数据导入到ORC表中,hive会自动执行 行列转化

INSERT OVERWRITE TABLE user_behavior1 PARTITION (`date`, behavior_type)
SELECT
  user_id,
  item_id,
  category_id,
  from_unixtime(`timestamp`) AS `timestamp`,
  date(from_unixtime(`timestamp`)) AS `date`,
  behavior_type
FROM
  temp_user_behavior;

查看一共多少条数据。

select count(*) from user_behavior1;
+------------+
|    _c0     |
+------------+
| 100150807  |
+------------+

3.2 数据清洗

-- 查看时间-数据分布情况,是否有异常值
select `date`, COUNT(*) from user_behavior1 group by `date` order by `date`;

-- 删除不在2017-11-25 到 2017-12-03日期的数据
alter table user_behavior1 
drop IF EXISTS partition (`date`<'2017-11-25'), partition (`date`>'2017-12-03');

-- 再次查看时间是否有异常值
select `date`, COUNT(*) from user_behavior1 group by `date` order by `date`;

+-------------+-----------+
|    date     |    _c1    |
+-------------+-----------+
| 2017-11-25  | 10511605  |
| 2017-11-26  | 10571046  |
| 2017-11-27  | 10013457  |
| 2017-11-28  | 9884189   |
| 2017-11-29  | 10319066  |
| 2017-11-30  | 10541698  |
| 2017-12-01  | 11171515  |
| 2017-12-02  | 13940949  |
| 2017-12-03  | 11961008  |
+-------------+-----------+
--查看 behavior_type 是否有异常值
select behavior_type, COUNT(*) from user_behavior1 group by behavior_type;

+----------------+-----------+
| behavior_type  |    _c1    |
+----------------+-----------+
| cart           | 5466118   |
| pv             | 88596903  |
| buy            | 1998976   |
| fav            | 2852536   |
+----------------+-----------+
-- 去掉完全重复的数据
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;

INSERT OVERWRITE TABLE user_behavior1
PARTITION (`date`, `behavior_type`)
SELECT DISTINCT `user_id`, `item_id`, `category_id`, `timestamp`, `date`, `behavior_type`
FROM user_behavior1
DISTRIBUTE BY `date`, `behavior_type`;

-- 查看目前多少条
select count(*) from user_behavior1;
+-----------+
|    _c0    |
+-----------+
| 98914484  |
+-----------+

DISTRIBUTE BY date, behavior_type这个是用来指定数据分发的策略,它会根据分区键的值将数据分发到不同的reduce任务中,每个reduce任务只处理一个分区的数据。这样就可以在每个分区内部去重,而不需要到全局数据去比较,所以效率高很多。在hdfs上,表按照 date, behavior_type分区后,分区的文件夹数量= date分区数* behavior_type分区数。当DISTRIBUTE BY date, behavior_type;时,可以理解为是在date分区数* behavior_type分区数 这么多个局部中比较去重。

同时如果DISTRIBUTE BY date, behavior_type粒度划分的太细,导致启动的容器太多,计算时间占比较低,可以选择只DISTRIBUTE BY一个。

Last updated