linux 之 shell脚本实现SFTP下载、上传文件、执行sftp命令
需求需求方通过sftp不定时的上传一批用户(SBXDS_ACC_M_任务ID_yyyymmddHHMMSS.csv),需要我们从这些用户中找出满足条件的用户。然后把这些结果用户通过文件的形式上传到ftp。
环境说明
ip1能连接hive库环境,不能连接sftp。
ip2不能连接hive库环境,能连接sftp。
ip1和ip2是共享盘,能同时访问公共目录。
目录规划
源文件名: SBXDS_ACC_M_任务ID_yyyymmddHHMMSS.csv (例:SBXDS_ACC_M_test001_20240201103828.csv)
结果文件名: WTF_YBZ_DSGS_任务id.csv (例:WTF_YBZ_DSGS_test001.csv)
本地路径: /date/localPath/SBXDS_ACC_M
sftp下载路径(源文件):/ftpdata/preciseUpload
sftp上传路径(结果文件):/ftpdata/orderringlist
处理逻辑
1.通过sftp获取preciseUpload目录下,当天的 SBXDS_ACC_M_*_YYYYMMDD*.csv 文件名,并写入本地文件SBXDS_ACC_M.txt
2.比对SBXDS_ACC_M.txt里面的文件是否在SBXDS_ACC_M目录下存在,不存在则将文件名写入SBXDS_ACC_M_NEWFILES.txt。
3.获取SBXDS_ACC_M_NEWFILE.txt里面的文件,并写入hive表 dods_ftp_target_users partition(task_id)。
4.目标用户与通信圈用户匹配,跑出结果数据dapp_ftp_calling_txq_users partition(task_id)。
5.结果数据写入文件WTF_YBZ_DSGS_task_id.csv,并上传sftp
6.为了防止文件正在传输,内容未传输完,需求方就把文件取走了,导致漏数据的情况。在文件上传之后,对文件进行重命名处理。
hive表
-- 目标用户hive表
create table ods_target_users(
phone_no string
)
-- 结果数据hive表
create table dapp_target_result_users(
phone_no string,
msg_info string
)脚本执行顺序
sh ftp_getTodayFiles.sh 20240201 # ip1执行,文件处理
sh ftp_getTxq_users.sh # ip2 执行,跑SQL
sh ftp_sftpPutFiles.sh # ip1 执行,文件处理
脚本
#!/bin/bash
# ftp_getTodayFiles.sh
# ip2 执行
# 调用方式 sh ftp_getTodayFiles.sh 20240201
# 定义函数,获取文件名称
function funcGetTodayFilesName(){
dateNo=$1
PORT=22
HOST="ip1"
USERNAME="uname1"
PASSWORD="123456"
LOCALDIR="/date/localPath/SBXDS_ACC_M"
SFTPDIR="/ftpdata/preciseUpload"
FILENAME="SBXDS_ACC_M_*_$dateNo*"
cd $LOCALDIR
/usr/bin/expect << EOF
spawn sftp -P $PORT $USERNAME@$HOST
expect "*Password*"
send "$PASSWORD\r"
expect "*#"
send "cd $SFTPDIR\r"
expect "*#"
send "ls -1 $FILENAME\r"
expect eof
EOF
}
# 定义函数,sftp获取文件
function funcSftpGetFile(){
FILENAME=$1
PORT=22
HOST="ip1"
USERNAME="uname1"
PASSWORD="123456"
LOCALDIR="/date/localPath/SBXDS_ACC_M"
SFTPDIR="/ftpdata/preciseUpload"
cd $LOCALDIR
/usr/bin/expect << EOF
spawn sftp -P $PORT $USERNAME@$HOST
expect "*Password*"
send "$PASSWORD\r"
expect "*#"
send "cd $SFTPDIR\r"
expect "*#"
send "get $FILENAME \r"
expect eof
EOF
}
# 调用函数,获取文件名称
dateNo=$1
LOCALDIR="/date/localPath/SBXDS_ACC_M"
cd $LOCALDIR
echo "开始获取sftp目录下当天文件名称SBXDS_ACC_M_*_$dateNo*......"
funcGetTodayFilesName $dateNo|grep ".csv" > SBXDS_ACC_M_FILES.txt
echo "获取sftp目录下当天文件名称结束"
dayfiles=$(cat SBXDS_ACC_M_FILES.txt|wc -l)
if [ $dayfiles -eq 0 ]; then
echo "sftp今日无文件。"
exit
else
echo "当日所有文件:"
cat SBXDS_ACC_M_FILES.txt
fi
echo "清空文件 SBXDS_ACC_M_NEWFILES.txt"
echo "判断本地是否存在文件,不存在则将文件名写入文件SBXDS_ACC_M_NEWFILES.txt"
> SBXDS_ACC_M_NEWFILES.txt
for filename in $(cat SBXDS_ACC_M_FILES.txt|sed 's/\r$//')
do
if [ -e $filename ]; then
echo "$filename 已存在"
else
echo "$filename 新文件"
echo $filename >> SBXDS_ACC_M_NEWFILES.txt
fi
done
newfiles=$(cat SBXDS_ACC_M_NEWFILES.txt|wc -l)
if [ $newfiles -eq 0 ]; then
echo "无新增文件。"
exit
else
echo "文件名写入SBXDS_ACC_M_NEWFILES.txt结束"
echo "当日新增文件$newfiles个:"
cat SBXDS_ACC_M_NEWFILES.txt
fi
# 调用函数,sftp获取文件
echo "开始下载文件...."
LOCALDIR="/date/localPath/SBXDS_ACC_M"
cd $LOCALDIR
for filename in $(cat SBXDS_ACC_M_NEWFILES.txt|sed 's/\r$//')
do
echo "下载$filename..."
funcSftpGetFile $filename
done
echo "下载文件结束,文件名如下:"
cat SBXDS_ACC_M_NEWFILES.txt
# 验证 SBXDS_ACC_M_NEWFILES.txt
newfiles2=$(cat SBXDS_ACC_M_NEWFILES.txt|wc -l)
if [ $newfiles2 -eq 0 ]; then
echo "文件异常!!!"
exit 1
fiftp_getTodayFiles.sh#!/bin/bash
# ftp_getTxq_users.sh
# ip1 执行
# 把SBXDS_ACC_M_NEWFILE.txt里面的文件写入hive表 dods_ftp_target_users partition(task_id)。
# 匹配通信圈用户 dapp_ftp_txq_users
# 结果数据导入文件WTF_YBZ_DSGS_task_id
# 调用格式: sh ftp_getTxq_users.sh 20240114
dateNo=$1
LOCALDIR="/date/localPath/SBXDS_ACC_M"
cd $LOCALDIR
newfiles=$(cat SBXDS_ACC_M_NEWFILES.txt|wc -l)
if [ $newfiles -eq 0 ]; then
echo "无新增文件。"
exit
fi
fileList=()
for filename in $(cat SBXDS_ACC_M_NEWFILES.txt|sed 's/\r$//')
do
task_id=$(echo $filename|cut -d'_' -f4)
echo "文件名: $filename,任务ID: $task_id"
echo "执行命令:hadoop fs -mkdir hdfs://ns2/data//dods/dods_ftp_target_users/task_id=$task_id"
hadoop fs -mkdir hdfs://ns2/data//dods/dods_ftp_target_users/task_id=$task_id
if [ $? -eq 0 ]; then
echo "执行命令:hadoop fs -put -f $filename hdfs://ns2/data//dods/dods_ftp_target_users/task_id=$task_id"
hadoop fs -put -f $filename hdfs://ns2/data//dods/dods_ftp_target_users/task_id=$task_id
else
echo "分区 $task_id 已存在。"
echo "执行命令:hadoop fs -put -f $filename hdfs://ns2/data//dods/dods_ftp_target_users/task_id=$task_id"
hadoop fs -put -f $filename hdfs://ns2/data//dods/dods_ftp_target_users/task_id=$task_id
fi
# 匹配通信圈用户
hive -e"
MSCK REPAIR TABLE dods_ftp_target_users;
insert overwrite table dapp_ftp_txq_users partition(task_id='${task_id}')
select a.phone_no,
b.msg_info
from dods_ftp_target_users a,
dods_msg_info b
wherea.phone_no=b.phone_no
and a.task_id='${task_id}'
and b.deal_day='${dateNo}'
groupby a.phone_no,
b.msg_info
"
# 结果数据导入文件
hive -e"
select concat_ws(',',phone_no,msg_info)
from dapp_ftp_txq_users
where task_id = '${task_id}'
" > WTF_YBZ_DSGS_${task_id}
# 文件记录数
file_rows=$(cat WTF_YBZ_DSGS_${task_id}|wc -l)
msgs="源文件$filename的结果文件WTF_YBZ_DSGS_${task_id},记录数$file_rows。"
fileList+=$msgs
done
echo "************************* 执行成功 *************************"
for msg in ${fileList[*]}
do
echo $msg
doneftp_getTxq_users.sh#!/bin/bash# ftp_sftpPutFiles.sh# ip2 执行# 调用格式 sh ftp_sftpPutFiles.sh# 定义函数,sftp上传文件function sftpPutFiles(){FILENAME=$1PORT=22HOST="ip1"USERNAME="uname1"PASSWORD="123456"LOCALDIR="/date/localPath/SBXDS_ACC_M"SFTPDIR="/ftpdata/orderringlist"cd $LOCALDIR/usr/bin/expect
页:
[1]