sqoop 自动化脚本
Sqoop是一款开源的工具,主要用于在HADOOP(Hive)与传统的数据库(mysql、postgresql…)间进行数据的传递,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS,NOSQL中,也可以将HDFS的数据导进到关系型数据库中。
Sqoop项目开始于2009年,最早是作为Hadoop的一个第三方模块存在,后来为了让使用者能够快速部署,也为了让开发人员能够更快速的迭代开发,Sqoop独立成为一个Apache项目。
1、sqoop auto hive && snappy
(1)-m 参数的说明,代表含义是使用多少个并行,如果参数值是1说明没有开启并功能。
(2)-m 参数的值调大,使用并行导入的功能,如果-m 4说明可以并行开启4个进程,同时进行数据导入。
(3)如果从Oracle中导入的表没有主键,那么会出现如下的错误提示:
ERROR tool.ImportTool: Error during import: No primary key could be
found for table tmp.tbls . Please specify one with --split-by or perform
a sequential import with '-m 1'.
(4)需要手动指定一个oracle字段,SQOOP开启并行导入功能,首先他会根据这个字段
执行如下查询:1
2select min(local_code), max(local_code) from pu_web.bigdata_area_code where
data_desc='2016-02-26' [where 如果指定了where子句,没有则忽略] ==> 0691,9999然后sqoop会根据并行导入-m值,进行拆分,比如上面的结果0691-9999,-m 16那么会被均分为16份,并行执行导入。
1
select * from table where 0 <= id < 0691+(9999-0691)/16; 累死这样的算法,均分16个范围,并行导入数据。
注意,拆分的字段需要是整数。
1 | 1、测试连接 |
上面命令优点:
1、任务放到mapred任务队列
2、增加并行执行度,提升效率
3、文件以snappy格式存放到集群,目前snappy压缩,是最佳选择。
- (5)、auto rdbms to hive && compress SnappyCodec
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114$ cat rdbms_to_hive_auto.py
#!/usr/bin/env python
# coding: utf-8
__author__ = 'whoami'
import commands,logging
def is_num_by_except(num):
try:
int(num)
return True
except ValueError:
# print "%s ValueError" % num
return False
def db_settings():
# db info format --> id,db_username,jdbc_info,db_passswd
db_info = {1:'WEB,jdbc:oracle:thin:@host_ip:1521:orcl,123',
2:'TARI,jdbc:oracle:thin:@host_ip:1521:orcl,123',
3:'TEST,jdbc:oracle:thin:@host_ip:1521:orcl,123',
4:'mysql,com.mysql.jdbc.Driver,admin123'}
return db_info
def database_info():
lines = db_settings()
print 'SQOOP:Oracle jdbc connect info...'
for k,v in lines.items():
print ' ',k,v
def get_matche_conn(db):
while 1:
number = raw_input("Please input number: ")
if is_num_by_except(number):
con = db.get(int(number))
return con
break
else:
logging.error(' Input types Error,please input number types... ')
continue
def is_sqoop_split():
is_split = raw_input("Do you want support sqoop split [y/n] (n)?")
if 'y' is is_split:
return True
else:
return False
def sqoop_split_column():
while 1:
sqoop_split_cloumn = raw_input("Please input sqoop split cloumn,cloumn type is integer: ")
number = raw_input("Please input sqoop split map concurrent execution number: ")
if is_num_by_except(number):
return sqoop_split_cloumn,number
break
else:
logging.error(' Input types Error,please input number types... ')
continue
def delete_hive_table(table_name):
command = "hive -e 'drop table if exists %s'" %(table_name)
logging.warn('starting drop table if exists %s ... ' %table_name)
status,result = commands.getstatusoutput(command)
if status == 0:
logging.warn(result)
logging.warn('drop hive table success... '),
else:
logging.warn('drop hive table fail...')
def raw_table():
rdbms_table_name = raw_input("Please input rdbms table name: ")
hive_table_name = raw_input("Please input hive table name: ")
return rdbms_table_name,hive_table_name
def sqoop_task_parse(is_split,rdbms_table_name,hive_table_name,rdbms_username,rdbms_jdbc,rdbms_passwd):
if is_split:
split_column,split_m=sqoop_split_column()
sqoop_import = 'sqoop import -D mapreduce.job.queuename=db --connect %s --username %s --password %s --table %s --fields-terminated-by "\\t" --lines-terminated-by "\\n" --hive-import --create-hive-table --hive-table %s --compression-codec "org.apache.hadoop.io.compress.SnappyCodec" -m %s --split-by %s' %(rdbms_jdbc,rdbms_username,rdbms_passwd,rdbms_table_name,hive_table_name,split_m,split_column) + " --null-string '\\\N' --null-non-string '\\\N'"
return sqoop_import
else:
sqoop_import = 'sqoop import -D mapreduce.job.queuename=db --connect %s --username %s --password %s --table %s --fields-terminated-by "\\t" --lines-terminated-by "\\n" --hive-import --create-hive-table --hive-table %s --compression-codec "org.apache.hadoop.io.compress.SnappyCodec" -m 1' %(rdbms_jdbc,rdbms_username,rdbms_passwd,rdbms_table_name,hive_table_name) + " --null-string '\\\N' --null-non-string '\\\N'"
return sqoop_import
def sqoop_task(sqoop_import):
logging.warn('starting sqoop import rdbms to hive ...')
status,result = commands.getstatusoutput(sqoop_import)
if status == 0:
logging.warn(result)
logging.warn('sqoop import rdbms to hive success... '),
else:
logging.warn('sqoop import rdbms to hive fail...')
def task_execute(hive_table_name,sqoop_import):
delete_hive_table(hive_table_name)
print sqoop_import
sqoop_task(sqoop_import)
def sqoop_main():
database_info()
db = db_settings()
conn = get_matche_conn(db).strip().split(',')
rdbms_table_name,hive_table_name = raw_table()
is_split = is_sqoop_split()
sqoop_import = sqoop_task_parse(is_split,rdbms_table_name,hive_table_name,conn[0],conn[1],conn[2])
task_execute(hive_table_name,sqoop_import)
sqoop_main()
2、auto hdfs to oracle
3、问题整理?
Oracle导数据需要注意,表明和库明要大写,否则会找不到表。
导parquet格式数据:
ERROR:Dataset name tmp.barea_code is not alphanumeric (plus ‘_’)
impala 生成的parquet表无法导回oracle,mr生成的parquet能导回oracle
Import null内容处理,select * from d_area_code where area_name is null;
- –null-string ‘\N’
- –null-non-string ‘\N’
- rdbms导入数据到Hadoop集群,加入上面两个参数,执行上面SQL可以过滤出结果
- 不加上面两个参数,导入hdfs之后结果变成’null‘,执行上面SQL无法过滤出结果
- 如果加入参数,可以过滤出结果,显示’NULL’内容。
Export null内容处理和Import道理一致
- –input-null-string ‘\N’
- –input-null-non-string ‘\N’