linux 之 shell脚本实现SFTP下载、上传文件、执行sftp命令
|
需求
需求方通过sftp不定时的上传一批用户(SBXDS_ACC_M_任务ID_yyyymmddHHMMSS.csv),需要我们从这些用户中找出满足条件的用户。然后把这些结果用户通过文件的形式上传到ftp。
环境说明
ip1能连接hive库环境,不能连接sftp。
ip2不能连接hive库环境,能连接sftp。
ip1和ip2是共享盘,能同时访问公共目录。
目录规划
源文件名: SBXDS_ACC_M_任务ID_yyyymmddHHMMSS.csv (例:SBXDS_ACC_M_test001_20240201103828.csv)
结果文件名: WTF_YBZ_DSGS_任务id.csv (例:WTF_YBZ_DSGS_test001.csv)
本地路径: /date/localPath/SBXDS_ACC_M
sftp下载路径(源文件):/ftpdata/preciseUpload
sftp上传路径(结果文件):/ftpdata/orderringlist
处理逻辑
1.通过sftp获取preciseUpload目录下,当天的 SBXDS_ACC_M_*_YYYYMMDD*.csv 文件名,并写入本地文件SBXDS_ACC_M.txt
2.比对SBXDS_ACC_M.txt里面的文件是否在SBXDS_ACC_M目录下存在,不存在则将文件名写入SBXDS_ACC_M_NEWFILES.txt。
3.获取SBXDS_ACC_M_NEWFILE.txt里面的文件,并写入hive表 dods_ftp_target_users partition(task_id)。
4.目标用户与通信圈用户匹配,跑出结果数据dapp_ftp_calling_txq_users partition(task_id)。
5.结果数据写入文件WTF_YBZ_DSGS_task_id.csv,并上传sftp
6.为了防止文件正在传输,内容未传输完,需求方就把文件取走了,导致漏数据的情况。在文件上传之后,对文件进行重命名处理。
hive表
- -- 目标用户hive表
- create table ods_target_users(
- phone_no string
- )
- -- 结果数据hive表
- create table dapp_target_result_users(
- phone_no string,
- msg_info string
- )
复制代码 脚本执行顺序
sh ftp_getTodayFiles.sh 20240201 # ip1执行,文件处理
sh ftp_getTxq_users.sh # ip2 执行,跑SQL
sh ftp_sftpPutFiles.sh # ip1 执行,文件处理
脚本
ftp_getTodayFiles.sh- #!/bin/bash
- # ftp_getTxq_users.sh
- # ip1 执行
- # 把SBXDS_ACC_M_NEWFILE.txt里面的文件写入hive表 dods_ftp_target_users partition(task_id)。
- # 匹配通信圈用户 dapp_ftp_txq_users
- # 结果数据导入文件WTF_YBZ_DSGS_task_id
- # 调用格式: sh ftp_getTxq_users.sh 20240114
- dateNo=$1
- LOCALDIR="/date/localPath/SBXDS_ACC_M"
- cd $LOCALDIR
- newfiles=$(cat SBXDS_ACC_M_NEWFILES.txt|wc -l)
- if [ $newfiles -eq 0 ]; then
- echo "无新增文件。"
- exit
- fi
- fileList=()
- for filename in $(cat SBXDS_ACC_M_NEWFILES.txt|sed 's/\r$//')
- do
- task_id=$(echo $filename|cut -d'_' -f4)
- echo "文件名: $filename,任务ID: $task_id"
- echo "执行命令:hadoop fs -mkdir hdfs://ns2/data//dods/dods_ftp_target_users/task_id=$task_id"
- hadoop fs -mkdir hdfs://ns2/data//dods/dods_ftp_target_users/task_id=$task_id
- if [ $? -eq 0 ]; then
- echo "执行命令:hadoop fs -put -f $filename hdfs://ns2/data//dods/dods_ftp_target_users/task_id=$task_id"
- hadoop fs -put -f $filename hdfs://ns2/data//dods/dods_ftp_target_users/task_id=$task_id
- else
- echo "分区 $task_id 已存在。"
- echo "执行命令:hadoop fs -put -f $filename hdfs://ns2/data//dods/dods_ftp_target_users/task_id=$task_id"
- hadoop fs -put -f $filename hdfs://ns2/data//dods/dods_ftp_target_users/task_id=$task_id
- fi
-
- # 匹配通信圈用户
- hive -e"
- MSCK REPAIR TABLE dods_ftp_target_users;
- insert overwrite table dapp_ftp_txq_users partition(task_id='${task_id}')
- select a.phone_no,
- b.msg_info
- from dods_ftp_target_users a,
- dods_msg_info b
- where a.phone_no=b.phone_no
- and a.task_id='${task_id}'
- and b.deal_day='${dateNo}'
- group by a.phone_no,
- b.msg_info
- "
- # 结果数据导入文件
- hive -e"
- select concat_ws(',',phone_no,msg_info)
- from dapp_ftp_txq_users
- where task_id = '${task_id}'
- " > WTF_YBZ_DSGS_${task_id}
-
- # 文件记录数
- file_rows=$(cat WTF_YBZ_DSGS_${task_id}|wc -l)
- msgs="源文件$filename的结果文件WTF_YBZ_DSGS_${task_id},记录数$file_rows。"
- fileList+=$msgs
- done
- echo "************************* 执行成功 *************************"
- for msg in ${fileList[*]}
- do
- echo $msg
- done
复制代码 ftp_getTxq_users.sh[code]#!/bin/bash# ftp_sftpPutFiles.sh# ip2 执行# 调用格式 sh ftp_sftpPutFiles.sh# 定义函数,sftp上传文件function sftpPutFiles(){FILENAME=$1PORT=22HOST="ip1" USERNAME="uname1"PASSWORD="123456" LOCALDIR="/date/localPath/SBXDS_ACC_M"SFTPDIR="/ftpdata/orderringlist"cd $LOCALDIR/usr/bin/expect |
本帖子中包含更多资源
您需要 登录 才可以下载或查看,没有账号?立即注册
x
|
|
|
发表于 2024-2-1 22:24:00
举报
回复
分享
|
|
|
|