sqoop 自动化脚本

Sqoop是一款开源的工具,主要用于在HADOOP(Hive)与传统的数据库(mysql、postgresql…)间进行数据的传递,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS,NOSQL中,也可以将HDFS的数据导进到关系型数据库中。
Sqoop项目开始于2009年,最早是作为Hadoop的一个第三方模块存在,后来为了让使用者能够快速部署,也为了让开发人员能够更快速的迭代开发,Sqoop独立成为一个Apache项目。

1、sqoop auto hive && snappy

  • (1)-m 参数的说明,代表含义是使用多少个并行,如果参数值是1说明没有开启并功能。

  • (2)-m 参数的值调大,使用并行导入的功能,如果-m 4说明可以并行开启4个进程,同时进行数据导入。

  • (3)如果从Oracle中导入的表没有主键,那么会出现如下的错误提示:
    ERROR tool.ImportTool: Error during import: No primary key could be
    found for table tmp.tbls . Please specify one with --split-by or perform
    a sequential import with '-m 1'.

  • (4)需要手动指定一个oracle字段,SQOOP开启并行导入功能,首先他会根据这个字段
    执行如下查询:

     select min(local_code), max(local_code) from pu_web.bigdata_area_code where
     data_desc='2016-02-26' [where 如果指定了where子句,没有则忽略] ==> 0691,9999
    

    然后sqoop会根据并行导入-m值,进行拆分,比如上面的结果0691-9999,-m 16那么会被均分为16份,并行执行导入。

    select * from table where 0 <= id < 0691+(9999-0691)/16;  累死这样的算法,均分16个范围,并行导入数据。
    

注意,拆分的字段需要是整数。

1、测试连接
sqoop list-tables --connect jdbc:oracle:thin:@oracle_ip:1521:orcl --username TMP --password 123

2、删除已存在表
hive -e 'drop table if exists tmp.barea_code'

3、自动建立hive表,并且导入数据到相应目录
sqoop import  -D mapreduce.job.queuename=mapred --connect jdbc:oracle:thin:@oracle_ip:1521:orcl --username TMP --password 123 --table BAREA_CODE --fields-terminated-by "\t" --lines-terminated-by "\n" --hive-import --create-hive-table --hive-overwrite --hive-table tmp.barea_code --null-string '\\N' --null-non-string '\\N'  --as-parquetfile --compression-codec "org.apache.hadoop.io.compress.SnappyCodec" -m 16 --split-by local_code

4、重写已经存在hive表的数据
sqoop import  -D mapreduce.job.queuename=mapred --connect jdbc:oracle:thin:@oracle_ip:1521:orcl --username TMP --password 123 --table BAREA_CODE --fields-terminated-by "\t" --lines-terminated-by "\n" --hive-import --hive-overwrite --hive-table tmp.barea_code --null-string '\\N' --null-non-string '\\N' --compression-codec "org.apache.hadoop.io.compress.SnappyCodec" -m 16 --split-by local_code

上面命令优点:
1、任务放到mapred任务队列
2、增加并行执行度,提升效率
3、文件以snappy格式存放到集群,目前snappy压缩,是最佳选择。

  • (5)、auto rdbms to hive && compress SnappyCodec
    ```
    $ cat rdbms_to_hive_auto.py

    !/usr/bin/env python

    coding: utf-8

    author = ‘whoami’
    import commands,logging

def is_num_by_except(num):
try:
int(num)
return True
except ValueError:

            # print "%s ValueError" % num
            return False

def db_settings():

    # db info format --> id,db_username,jdbc_info,db_passswd
    db_info = {1:'WEB,jdbc:oracle:thin:@host_ip:1521:orcl,123', 
               2:'TARI,jdbc:oracle:thin:@host_ip:1521:orcl,123',
               3:'TEST,jdbc:oracle:thin:@host_ip:1521:orcl,123',
               4:'mysql,com.mysql.jdbc.Driver,admin123'}

    return db_info

def database_info():
lines = db_settings()
print ‘SQOOP:Oracle jdbc connect info…’
for k,v in lines.items():
print ‘ ‘,k,v

def get_matche_conn(db):

    while 1:
            number = raw_input("Please input number: ")
            if is_num_by_except(number):
                    con = db.get(int(number))
                    return con
                    break
            else:
                    logging.error(' Input types Error,please input number types... ')
                    continue

def is_sqoop_split():
is_split = raw_input(“Do you want support sqoop split [y/n] (n)?”)
if ‘y’ is is_split:
return True
else:
return False

def sqoop_split_column():
while 1:
sqoop_split_cloumn = raw_input(“Please input sqoop split cloumn,cloumn type is integer: “)
number = raw_input(“Please input sqoop split map concurrent execution number: “)

            if is_num_by_except(number):
                    return sqoop_split_cloumn,number
                    break
            else:
                    logging.error(' Input types Error,please input number types... ')
                    continue

def delete_hive_table(table_name):
command = “hive -e ‘drop table if exists %s’” %(table_name)
logging.warn(‘starting drop table if exists %s … ‘ %table_name)
status,result = commands.getstatusoutput(command)

    if status == 0:
            logging.warn(result)
            logging.warn('drop hive table success... '),
    else:
            logging.warn('drop hive table fail...')

def raw_table():
rdbms_table_name = raw_input(“Please input rdbms table name: “)

    hive_table_name = raw_input("Please input hive table name: ")

    return rdbms_table_name,hive_table_name

def sqoop_task_parse(is_split,rdbms_table_name,hive_table_name,rdbms_username,rdbms_jdbc,rdbms_passwd):
if is_split:
split_column,split_m=sqoop_split_column()
sqoop_import = ‘sqoop import -D mapreduce.job.queuename=db —connect %s —username %s —password %s —table %s —fields-terminated-by “\t” —lines-terminated-by “\n” —hive-import —create-hive-table —hive-table %s —compression-codec “org.apache.hadoop.io.compress.SnappyCodec” -m %s —split-by %s’ %(rdbms_jdbc,rdbms_username,rdbms_passwd,rdbms_table_name,hive_table_name,split_m,split_column) + “ —null-string ‘\\N’ —null-non-string ‘\\N’”
return sqoop_import
else:
sqoop_import = ‘sqoop import -D mapreduce.job.queuename=db —connect %s —username %s —password %s —table %s —fields-terminated-by “\t” —lines-terminated-by “\n” —hive-import —create-hive-table —hive-table %s —compression-codec “org.apache.hadoop.io.compress.SnappyCodec” -m 1’ %(rdbms_jdbc,rdbms_username,rdbms_passwd,rdbms_table_name,hive_table_name) + “ —null-string ‘\\N’ —null-non-string ‘\\N’”
return sqoop_import

def sqoop_task(sqoop_import):
logging.warn(‘starting sqoop import rdbms to hive …’)
status,result = commands.getstatusoutput(sqoop_import)

    if status == 0:
            logging.warn(result)
            logging.warn('sqoop import rdbms to hive success... '),
    else:
            logging.warn('sqoop import rdbms to hive fail...')

def task_execute(hive_table_name,sqoop_import):
delete_hive_table(hive_table_name)
print sqoop_import
sqoop_task(sqoop_import)

def sqoop_main():
database_info()
db = db_settings()
conn = get_matche_conn(db).strip().split(‘,’)
rdbms_table_name,hive_table_name = raw_table()
is_split = is_sqoop_split()
sqoop_import = sqoop_task_parse(is_split,rdbms_table_name,hive_table_name,conn[0],conn[1],conn[2])
task_execute(hive_table_name,sqoop_import)

sqoop_main()
```

2、auto hdfs to oracle

3、问题整理?

  • Oracle导数据需要注意,表明和库明要大写,否则会找不到表。

  • 导parquet格式数据:

    • ERROR:Dataset name tmp.bareacode is not alphanumeric (plus ‘‘)

    • impala 生成的parquet表无法导回oracle,mr生成的parquet能导回oracle

    • Import null内容处理,select * from d_area_code where area_name is null;

      • —null-string ‘\N’
      • —null-non-string ‘\N’
      • rdbms导入数据到Hadoop集群,加入上面两个参数,执行上面SQL可以过滤出结果
      • 不加上面两个参数,导入hdfs之后结果变成’null‘,执行上面SQL无法过滤出结果
      • 如果加入参数,可以过滤出结果,显示’NULL’内容。
    • Export null内容处理和Import道理一致

      • —input-null-string ‘\N’
      • —input-null-non-string ‘\N’

原创文章,转载请注明: 转载自Itweet的博客
本博客的文章集合: http://www.itweet.cn/blog/archive/