翼度科技»论坛 云主机 LINUX 查看内容

linux 之 shell脚本实现SFTP下载、上传文件、执行sftp命令

4

主题

4

帖子

12

积分

新手上路

Rank: 1

积分
12
需求

需求方通过sftp不定时的上传一批用户(SBXDS_ACC_M_任务ID_yyyymmddHHMMSS.csv),需要我们从这些用户中找出满足条件的用户。然后把这些结果用户通过文件的形式上传到ftp。
环境说明

ip1能连接hive库环境,不能连接sftp。
ip2不能连接hive库环境,能连接sftp。
ip1和ip2是共享盘,能同时访问公共目录。
目录规划

源文件名: SBXDS_ACC_M_任务ID_yyyymmddHHMMSS.csv (例:SBXDS_ACC_M_test001_20240201103828.csv)
结果文件名: WTF_YBZ_DSGS_任务id.csv (例:WTF_YBZ_DSGS_test001.csv)
本地路径: /date/localPath/SBXDS_ACC_M
sftp下载路径(源文件):/ftpdata/preciseUpload
sftp上传路径(结果文件):/ftpdata/orderringlist
处理逻辑

1.通过sftp获取preciseUpload目录下,当天的 SBXDS_ACC_M_*_YYYYMMDD*.csv 文件名,并写入本地文件SBXDS_ACC_M.txt
2.比对SBXDS_ACC_M.txt里面的文件是否在SBXDS_ACC_M目录下存在,不存在则将文件名写入SBXDS_ACC_M_NEWFILES.txt。
3.获取SBXDS_ACC_M_NEWFILE.txt里面的文件,并写入hive表 dods_ftp_target_users partition(task_id)。
4.目标用户与通信圈用户匹配,跑出结果数据dapp_ftp_calling_txq_users partition(task_id)。
5.结果数据写入文件WTF_YBZ_DSGS_task_id.csv,并上传sftp
6.为了防止文件正在传输,内容未传输完,需求方就把文件取走了,导致漏数据的情况。在文件上传之后,对文件进行重命名处理。
hive表
  1. -- 目标用户hive表
  2. create table ods_target_users(
  3. phone_no string
  4. )
  5. -- 结果数据hive表
  6. create table dapp_target_result_users(
  7. phone_no string,
  8. msg_info string
  9. )
复制代码
脚本执行顺序

sh ftp_getTodayFiles.sh 20240201 # ip1执行,文件处理
sh ftp_getTxq_users.sh # ip2 执行,跑SQL
sh ftp_sftpPutFiles.sh # ip1 执行,文件处理
脚本

  1. #!/bin/bash
  2. # ftp_getTodayFiles.sh
  3. # ip2 执行
  4. # 调用方式 sh ftp_getTodayFiles.sh 20240201
  5. # 定义函数,获取文件名称
  6. function funcGetTodayFilesName(){
  7. dateNo=$1
  8. PORT=22
  9. HOST="ip1"  
  10. USERNAME="uname1"
  11. PASSWORD="123456"  
  12. LOCALDIR="/date/localPath/SBXDS_ACC_M"
  13. SFTPDIR="/ftpdata/preciseUpload"
  14. FILENAME="SBXDS_ACC_M_*_$dateNo*"
  15. cd $LOCALDIR
  16. /usr/bin/expect << EOF
  17. spawn sftp -P $PORT $USERNAME@$HOST
  18. expect "*Password*"
  19. send "$PASSWORD\r"
  20. expect "*#"
  21. send "cd $SFTPDIR\r"
  22. expect "*#"
  23. send "ls -1 $FILENAME\r"
  24. expect eof
  25. EOF
  26. }
  27. # 定义函数,sftp获取文件
  28. function funcSftpGetFile(){
  29. FILENAME=$1
  30. PORT=22
  31. HOST="ip1"  
  32. USERNAME="uname1"
  33. PASSWORD="123456"  
  34. LOCALDIR="/date/localPath/SBXDS_ACC_M"
  35. SFTPDIR="/ftpdata/preciseUpload"
  36. cd $LOCALDIR
  37. /usr/bin/expect << EOF
  38. spawn sftp -P $PORT $USERNAME@$HOST
  39. expect "*Password*"
  40. send "$PASSWORD\r"
  41. expect "*#"
  42. send "cd $SFTPDIR\r"
  43. expect "*#"
  44. send "get $FILENAME \r"
  45. expect eof
  46. EOF
  47. }
  48. # 调用函数,获取文件名称
  49. dateNo=$1
  50. LOCALDIR="/date/localPath/SBXDS_ACC_M"
  51. cd $LOCALDIR
  52. echo "开始获取sftp目录下当天文件名称SBXDS_ACC_M_*_$dateNo*......"
  53. funcGetTodayFilesName $dateNo|grep ".csv" > SBXDS_ACC_M_FILES.txt
  54. echo "获取sftp目录下当天文件名称结束"
  55. dayfiles=$(cat SBXDS_ACC_M_FILES.txt|wc -l)
  56. if [ $dayfiles -eq 0 ]; then
  57.   echo "sftp今日无文件。"
  58.   exit
  59. else
  60.   echo "当日所有文件:"
  61.   cat SBXDS_ACC_M_FILES.txt
  62. fi
  63. echo "清空文件 SBXDS_ACC_M_NEWFILES.txt"
  64. echo "判断本地是否存在文件,不存在则将文件名写入文件SBXDS_ACC_M_NEWFILES.txt"
  65. > SBXDS_ACC_M_NEWFILES.txt
  66. for filename in $(cat SBXDS_ACC_M_FILES.txt|sed 's/\r$//')
  67. do
  68.     if [ -e $filename ];    then
  69.       echo "$filename 已存在"
  70.   else
  71.       echo "$filename 新文件"
  72.       echo $filename >> SBXDS_ACC_M_NEWFILES.txt
  73.   fi
  74. done
  75. newfiles=$(cat SBXDS_ACC_M_NEWFILES.txt|wc -l)
  76. if [ $newfiles -eq 0 ]; then
  77.   echo "无新增文件。"
  78.   exit
  79. else
  80.   echo "文件名写入SBXDS_ACC_M_NEWFILES.txt结束"
  81.   echo "当日新增文件$newfiles个:"
  82.   cat SBXDS_ACC_M_NEWFILES.txt
  83. fi
  84. # 调用函数,sftp获取文件
  85. echo "开始下载文件...."
  86. LOCALDIR="/date/localPath/SBXDS_ACC_M"
  87. cd $LOCALDIR
  88. for filename in $(cat SBXDS_ACC_M_NEWFILES.txt|sed 's/\r$//')
  89. do
  90.     echo "下载$filename..."
  91.     funcSftpGetFile $filename
  92. done
  93. echo "下载文件结束,文件名如下:"
  94. cat SBXDS_ACC_M_NEWFILES.txt
  95. # 验证 SBXDS_ACC_M_NEWFILES.txt
  96. newfiles2=$(cat SBXDS_ACC_M_NEWFILES.txt|wc -l)
  97. if [ $newfiles2 -eq 0 ]; then
  98.   echo "文件异常!!!"
  99.   exit 1
  100. fi
复制代码
ftp_getTodayFiles.sh
  1. #!/bin/bash
  2. # ftp_getTxq_users.sh
  3. # ip1 执行
  4. # 把SBXDS_ACC_M_NEWFILE.txt里面的文件写入hive表 dods_ftp_target_users partition(task_id)。
  5. # 匹配通信圈用户 dapp_ftp_txq_users
  6. # 结果数据导入文件WTF_YBZ_DSGS_task_id
  7. # 调用格式: sh ftp_getTxq_users.sh 20240114
  8. dateNo=$1
  9. LOCALDIR="/date/localPath/SBXDS_ACC_M"
  10. cd $LOCALDIR
  11. newfiles=$(cat SBXDS_ACC_M_NEWFILES.txt|wc -l)
  12. if [ $newfiles -eq 0 ]; then
  13.   echo "无新增文件。"
  14.   exit
  15. fi
  16. fileList=()
  17. for filename in $(cat SBXDS_ACC_M_NEWFILES.txt|sed 's/\r$//')
  18.     do
  19.     task_id=$(echo $filename|cut -d'_' -f4)
  20.     echo "文件名: $filename,任务ID: $task_id"
  21.     echo "执行命令:hadoop fs -mkdir hdfs://ns2/data//dods/dods_ftp_target_users/task_id=$task_id"
  22.     hadoop fs -mkdir hdfs://ns2/data//dods/dods_ftp_target_users/task_id=$task_id
  23.     if [ $? -eq 0 ]; then
  24.       echo "执行命令:hadoop fs -put -f $filename hdfs://ns2/data//dods/dods_ftp_target_users/task_id=$task_id"
  25.         hadoop fs -put -f $filename hdfs://ns2/data//dods/dods_ftp_target_users/task_id=$task_id
  26.   else
  27.       echo "分区 $task_id 已存在。"
  28.       echo "执行命令:hadoop fs -put -f $filename hdfs://ns2/data//dods/dods_ftp_target_users/task_id=$task_id"
  29.         hadoop fs -put -f $filename hdfs://ns2/data//dods/dods_ftp_target_users/task_id=$task_id
  30.   fi
  31.    
  32.   # 匹配通信圈用户
  33.   hive -e"
  34.   MSCK REPAIR TABLE dods_ftp_target_users;
  35.   insert overwrite table dapp_ftp_txq_users partition(task_id='${task_id}')
  36.   select a.phone_no,
  37.          b.msg_info
  38.   from   dods_ftp_target_users a,
  39.          dods_msg_info b
  40.   where  a.phone_no=b.phone_no
  41.   and    a.task_id='${task_id}'
  42.   and    b.deal_day='${dateNo}'
  43.   group  by a.phone_no,
  44.          b.msg_info
  45.   "
  46.   # 结果数据导入文件
  47.   hive -e"
  48.     select concat_ws(',',phone_no,msg_info)
  49.     from dapp_ftp_txq_users
  50.     where task_id = '${task_id}'
  51.   " > WTF_YBZ_DSGS_${task_id}
  52.   
  53.   # 文件记录数
  54.   file_rows=$(cat WTF_YBZ_DSGS_${task_id}|wc -l)
  55.   msgs="源文件$filename的结果文件WTF_YBZ_DSGS_${task_id},记录数$file_rows。"
  56.   fileList+=$msgs
  57.   done
  58. echo "************************* 执行成功 *************************"
  59. for msg in ${fileList[*]}
  60. do
  61.     echo $msg
  62. done
复制代码
ftp_getTxq_users.sh
[code]#!/bin/bash# ftp_sftpPutFiles.sh# ip2 执行# 调用格式 sh ftp_sftpPutFiles.sh# 定义函数,sftp上传文件function sftpPutFiles(){FILENAME=$1PORT=22HOST="ip1"  USERNAME="uname1"PASSWORD="123456"  LOCALDIR="/date/localPath/SBXDS_ACC_M"SFTPDIR="/ftpdata/orderringlist"cd $LOCALDIR/usr/bin/expect

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

举报 回复 使用道具