Python远程连接ssh、在后台执行命令、非root权限的测试账号执行sudo命令时需要输入密码

您所在的位置:网站首页 ssh默认账号密码 Python远程连接ssh、在后台执行命令、非root权限的测试账号执行sudo命令时需要输入密码

Python远程连接ssh、在后台执行命令、非root权限的测试账号执行sudo命令时需要输入密码

2023-07-05 16:35| 来源: 网络整理| 查看: 265

1、环境准备

先安装pycryptodome,cryptography,再安装paramiko。 (较早的教程说安装pycrypto,但pycrypto已经停止维护。)

# 安装pycryptodome pip3 install pycryptodome # 安装cryptography pip3 install cryptography # 安装paramiko pip3 install paramiko 2、Python远程ssh服务器的核心方法 2.1 Python远程连接ssh服务器、执行上传文件、下载文件、在后台执行命令

paramiko模块——ssh远程连接服务器并执行命令 Python模块学习 - Paramiko python操作ssh上传和下载文件 使用 Python ssh 远程登陆服务器的最佳方案 Python—实现ssh客户端(连接远程服务器)

2.2 在后台执行命令并保存运行日志

有些命令执行时间较长,要使用nohup让命令执行时不被挂断在后台运行 Linux nohup 命令 命令格式:

nohup 要执行的command命令 > 运行日志.log 2>&1 &

上述格式可以在直接在终端运行。但是通过Python脚本运行时,最后的&符号总被吞掉导致报错。百思不得其解,尝试给&加转义符终于成功。即为:\&

# python 脚本远程执行命令,在后台运行并保存运行日志 os.popen('nohup 要执行的command命令 > 运行日志.log 2>&1 \&') 2.3 python远程ssh执行sudo命令时需要输入密码

非root权限的测试账号执行命令时需要加sudo,需要输入密码。

echo '服务器密码' | sudo -S command命令 对于2.2和2.3举例如下 cmd = "nohup echo '服务器账号的密码' | sudo -S docker exec -i {} bash -i -c \"python run.py --action create_video_from_audio --model_path models/{} --data_root /data --output_file test_results/{} --audio_path test/{} --audio_language 'zh-CN' \" > creat_video.log 2>&1 \&".format( digital_server, model_name, test_result,audio_name) execute_command(cmd) # execute_command()已封装的执行command命令的方法 2.3 将控制台输出保存到文件 # 将控制台输出到日志文件中,日志是追加模式,记得定时清理 # 便于检验数据获取是否正确。也可将这部分删除 class Logger(object): def __init__(self, filename='default.log', stream=sys.stdout): self.terminal = stream self.log = open(filename, 'a+') def write(self, message): self.terminal.write(message) self.log.write(message) def flush(self): pass # 将控制台输出存入日志文件 sys.stdout = Logger('../log/stout_log.log', sys.stdout) sys.stderr = Logger('../log/stout.log_file', sys.stderr) 3、完整代码

业务场景: 数字人:上传视频和音频到服务器,将视频训练成模型,再将音频和生成的模型合成视频产物,下载视频产物到本地,人工检查效果

#!/usr/bin/env python3 # -*- coding:utf-8 -*- # @Time : 2022/6/7 10:38 # @Author : Dora import copy import pytest import paramiko, os, pexpect, sys, itertools, time from time import sleep from scp import SCPClient from log.print_log import Logger # 将控制台输出存入日志文件 sys.stdout = Logger('../log/stout_log.log', sys.stdout) sys.stderr = Logger('../log/stout.log_file', sys.stderr) def execute_command(command): ssh = paramiko.SSHClient() # 允许连接不在know_hosts文件中的主机 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 建立连接 ssh.connect("47.100.74.174", username="aitest", port=22, password="zegoai@test") # 使用这个连接执行命令 ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(command) # 关闭连接 sleep(1) ssh.close() return ssh_stdout def upload_file(): # 上传文件 ssh_client = paramiko.SSHClient() ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy) ssh_client.connect('47.100.74.174', 22, 'aitest', 'zegoai@test') scp_client = SCPClient(ssh_client.get_transport(), socket_timeout=15.0) local_path = '/Users/zego/Documents/digital_human/audio/' # 上传的文件的本地路径 """ audios = ['Chinese-finance-male-30s.WAV', 'English-Hillary-27s.WAV', 'English-male-13s.WAV', 'Female-English-Young-smooth-33s.WAV', 'chinese-male-14mins.WAV', 'football-broadcast-male-HeWei-32s.WAV', 'news-broadcast-male-21s.WAV', 'Spanish-female-36s.wav', 'Spanish-male-12s.WAV', 'Spanish-male-15s.WAV'] """ audios = ['614_1-50s.wav', 'yy_mp3_demoise-1min17s.WAV'] upload_server_path = '/home/aitest/' # 上传到服务器的目标路径 for audio in audios: audio_path = local_path + audio try: scp_client.put(audio_path, upload_server_path) except FileNotFoundError as e: print(e) print("系统找不到指定文件" + local_path) else: print("文件上传成功") mv_cmd = "echo 'zegoai@test' | sudo -S mv {} /data/digital_datas/test/".format(audio) print(mv_cmd) execute_command(mv_cmd) ssh_client.close() def create_model_from_video(digital_server='digital_server-2023-06-15-16-32', video_name=''): # 用视频生成model start_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime()) # cmd: create_model_from_video # model路径:/data/digital_datas/model result_json = video_name + '-' + start_time + '-' + 'creat_model-dora-test.json' # debug=True,debug模式生成model,可在5分钟左右完成,用于测试。正式模式需要5小时左右。 # cmd = "nohup echo 'zegoai@test' | sudo -S docker exec -i digital_server-2023-06-09-15-06 bash -i -c "python run.py --action create_model_from_video --video_path test/yy_min.mp4 --data_root /data --output_file test_results/yy_min.json debug=True" > creat_model.log 2>&1 &" cmd = "nohup echo 'zegoai@test' | sudo -S docker exec -i {} bash -i -c \"python run.py --action create_model_from_video --video_path test/{} --data_root /data --output_file test_results/{}\" > creat_model.log 2>&1 \&".format( digital_server, video_name, result_json) print("执行create_model_from_video命令:\n", cmd) sleep(1) execute_command(cmd) # 生成结果的json路径:/data/digital_datas/results # 生成的mode的路径:/data/digital_datas/model model_id = '' while model_id == '': # 正式训练等待30min,一般生成model需要5小时。debug模式需要5分钟左右 sleep(120) # 读取log,获取生成model的id log_cmd = 'cat /data/digital_datas/test_results/%s' % result_json log = execute_command(log_cmd) # .read().decode('utf-8') for line in log.readlines(): # print("re line:",line) if 'model_path' in line: print('line:', line) model_id = line.split('models/')[1].replace('"', '').strip('\n') print(model_id) end_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime()) print('运行create_model_from_video()开始时间:', start_time) print('运行create_model_from_video()结束时间:', end_time) # 执行create_model_from_video过程的日志 run_log = execute_command('cat /home/aitest/creat_model.log').read().decode('utf-8') print('\n~*~*~*~*~*~执行create_model_from_video过程的日志~*~*~*~*~*~\n', run_log) result_log = execute_command(log_cmd).read().decode('utf-8') # 生成的视频的json信息 print('\n~*~*~*~*~*~生成的model的json信息~*~*~*~*~*~\n', result_log) # 为了将create日志打印出来保存到日志记录文件 # 删除json文件,避免下次有同名称的json文件存在导致错误 """ cmd = "echo 'zegoai@test' | sudo -S rm /data/digital_datas/test_results/%s" % result_json print(cmd) execute_command(cmd) """ return model_id def create_video_from_audio(digital_server='digital_server-2023-06-15-16-32', model_name='', audio_name=''): # 用音频和model生成视频 start_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime()) # cmd: create_video_from_audio # model路径:/data/digital_datas/model result_json = model_name + '-' + audio_name + '-' + start_time + '-dora-test.json' # cmd = "nohup echo 'zegoai@test' | sudo -S docker exec -i digital_server-2023-06-08-15-21 bash -i -c \"python run.py --action create_video_from_audio --model_path models/001636ac-2c7d-4eb3-ab0b-05a58ee668df.zip --data_root /data --output_file test_results/001636ac-2c7d-4eb3-ab0b-05a58ee668df-dora-test.json --audio_path test/b441d222-7526-491a-8bbc-5dad02e00d75.MP3.wav --audio_language 'zh-CN' \" > creat_video9.log 2>&1 \&" cmd = "nohup echo 'zegoai@test' | sudo -S docker exec -i {} bash -i -c \"python run.py --action create_video_from_audio --model_path models/{} --data_root /data --output_file test_results/{} --audio_path test/{} --audio_language 'zh-CN' \" > creat_video.log 2>&1 \&".format( digital_server, model_name, result_json, audio_name) print(cmd) sleep(1) execute_command(cmd) # 生成结果的json路径:/data/digital_datas/test_results # 生成的视频路径:/data/digital_datas/results/ video_name = '' while video_name == '': sleep(20) # 读取log,获取生成的视频名称 log_cmd = 'cat /data/digital_datas/test_results/%s' % result_json log = execute_command(log_cmd) # .read().decode('utf-8') for line in log.readlines(): # print("re line:",line) if 'video_path' in line: print('line:', line) video_name = line.split('results/')[1].replace('"', '').strip('\n') print(video_name) end_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime()) print('运行create_video_from_audio()开始时间:', start_time) print('运行create_video_from_audio()结束时间:', end_time) run_log = execute_command('cat /home/aitest/creat_video.log').read().decode( 'utf-8') # 执行creat_video_frome_audio过程的日志 print('\n~*~*~*~*~*~执行creat_video_frome_audio过程的日志~*~*~*~*~*~\n', run_log) result_log = execute_command(log_cmd).read().decode('utf-8') # 生成的视频的json信息 print('\n~*~*~*~*~*~生成的视频的json信息~*~*~*~*~*~\n', result_log) # 为了将create日志打印出来保存到日志记录文件 """ # 删除json文件,避免下次有同名称的json文件存在导致错误 cmd = "echo 'zegoai@test' | sudo -S rm /data/digital_datas/test_results/%s" % result_json print(cmd) execute_command(cmd) """ return video_name def download_file(video_name): """ 下载文件 """ current_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime()) print('运行download_file()开始时间:', current_time) # 获取create_model_from_video生成视频的路径 # video_name = create_model_from_video() # video_name = 'fe914398-fc13-4806-97ae-1beceef26ec6.mp4' ssh_client = paramiko.SSHClient() ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy) ssh_client.connect('47.100.74.174', 22, 'aitest', 'zegoai@test') scp_client = SCPClient(ssh_client.get_transport(), socket_timeout=15.0) origin_path = '/data/digital_datas/results/' + video_name print('origin_path:', origin_path) try: scp_client.get(origin_path, '/Users/zego/Documents/digital_human/result_video/') except FileNotFoundError as e: print(e) print("系统找不到指定路径" + '/Users/zego/Documents/digital_human/result_video/') else: print("文件下载成功") ssh_client.close() def cartesian_models_audios(): current_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime()) print('运行cartesian_models_audios()开始时间:', current_time) # 获取模型和音频的笛卡尔积 # '48dbc6a2-6b60-4961-a304-bc576dd38c25.zip'--不能用 # ['7a5f056c-131e-43c6-b813-ccbb65131b2d.zip']---嘴全黑 # 'ff640dee-1ef3-47bb-899b-52655ff399d2.zip' YY老师-绿幕,可用 # '1a3d901d-8e7f-47a3-9369-d696c6998c93.zip' YY老师-办公室,可用 # '001636ac-2c7d-4eb3-ab0b-05a58ee668df.zip' 苗族妹子,可用 # models = ['001636ac-2c7d-4eb3-ab0b-05a58ee668df.zip',,'26857271-7a9e-4b57-9338-b63b11e9362c.zip','761fc797-7905-4292-88ed-b4f7f75161df.zip','7a5f056c-131e-43c6-b813-ccbb65131b2d.zip','7f8fe127-6a54-4227-8f4e-11722ac90a6c.zip','906e494b-1257-4a7d-93ac-1e5b348d0c9c.zip','a11278ea-fbd4-45e1-9809-0eec28623bec.zip'] models = ['yy_model.zip' ]# ['yy_model.zip','2d061f47-4d23-43e2-8b1a-efe451bb9f02.zip','1a3d901d-8e7f-47a3-9369-d696c6998c93.zip','001636ac-2c7d-4eb3-ab0b-05a58ee668df.zip', '50836fa0-98c9-480e-89c1-62879ccfb0ba.zip','9363c572-3f6f-4f47-ba41-1af286e2687c.zip','7f8fe127-6a54-4227-8f4e-11722ac90a6c.zip','26857271-7a9e-4b57-9338-b63b11e9362c.zip','761fc797-7905-4292-88ed-b4f7f75161df.zip','001636ac-2c7d-4eb3-ab0b-05a58ee668df.zip'] audios = ['Chinese-finance-male-30s.WAV', 'English-Hillary-27s.WAV', 'English-male-13s.WAV', 'Female-English-Young-smooth-33s.WAV', 'football-broadcast-male-HeWei-32s.WAV', 'news-broadcast-male-21s.WAV', 'Spanish-female-36s.wav', 'Spanish-male-12s.WAV', 'Spanish-male-15s.WAV', 'chinese-male-15s.WAV','Spanish-female-30S.wav','weather-female-30s.WAV','chinese-male-14mins.WAV'] # audios = ['614_1-50s.wav', 'yy_mp3_demoise-1min17s.WAV'] audios = ['English-Hillary-27s.WAV', 'English-male-13s.WAV','Chinese-finance-male-30s.WAV','614_1-50s.wav', 'yy_mp3_demoise-1min17s.WAV'] # audios = ['Spanish-female-30S.wav', 'weather-female-30s.WAV','b441d222-7526-491a-8bbc-5dad02e00d75.MP3.wav', 'driving_audio_3.wav', 'driving_audio_6.wav','xiaoyu3.wav', 'xiaoyu.wav', 'chinese-male-15s.WAV'] # audios = ['chinese-male-15s.WAV'] # 'Spanish-female-30S.wav','Female-English-Young-smooth.WAV','b441d222-7526-491a-8bbc-5dad02e00d75.MP3.wav','driving_audio_3.wav','driving_audio_6.wav','xiaoyu3.wav','xiaoyu.wav'] # audios = ['b441d222-7526-491a-8bbc-5dad02e00d75.MP3.wav'] product = [] for cartesian in itertools.product(models, audios): print('cartesian:', cartesian) product.append(cartesian) print('product_models_audios:', product) return product def batch_create_video_download(): # 流程:推理视频-下载视频(支持批量) # 使用多组models和audios生成视频,并下载视频到本地 product = cartesian_models_audios() print(product) print("*************************批量执行*************************") id = 1 for group in product: print("~*~*~*~第{}次create_video_from_audio,本次使用的model和audio:{}~*~*~*~".format(id, group)) model = group[0] audio = group[1] video_name = create_video_from_audio(model_name=model, audio_name=audio) download_file(video_name) id = id + 1 def creat_model_video_download(): # 全流程:训练模型-推理视频-下载视频(支持批量) # 步骤1:生成model:create_model_from_video,支持1个和多个视频生成model,执行完1个再执行下一个 videos = ['1111.mp4', 'dongzuo_3.mp4'] models = [] id = 1 for video in videos: print("~*~*~*~第{}次生成model,本次使用的video:{}~*~*~*~".format(id, video)) model_id = create_model_from_video(video_name=video) models.append(model_id) id = id + 1 audios = ['Spanish-female-30S.wav', 'Female-English-Young-smooth.WAV', 'b441d222-7526-491a-8bbc-5dad02e00d75.MP3.wav', 'driving_audio_3.wav', 'driving_audio_6.wav', 'xiaoyu3.wav', 'xiaoyu.wav', 'chinese-male-15s.WAV'] # audios = ['b441d222-7526-491a-8bbc-5dad02e00d75.MP3.wav'] product = [] for cartesian in itertools.product(models, audios): product.append(cartesian) i = 1 for group in product: print("~*~*~*~第{}次用新生成的model生成video,本次使用的model和audio:{}~*~*~*~".format(i, group)) model = group[0] audio = group[1] # 步骤2:生成视频:create_video_from_audio,并获取video路径 video_name = create_video_from_audio(model_name=model, audio_name=audio) # 下载video到本地 download_file(video_name) i = i + 1 if __name__ == '__main__': print("~~~~~~~~~~~~~~~~~~~~~~~~~~~当前运行~~~~~~~~~~~~~~~~~~~~~~~~~~~") # 上传文件 # upload_file() # cartesian_models_audios() # download_file() # 流程:推理视频-下载视频(支持批量) batch_create_video_download() # 全流程:训练模型-推理视频-下载视频(支持批量) # creat_model_video_download() """ cmd='ls -ll' result = execute_command(cmd).read().decode('utf-8') print(result) """ 一些报错处理

文件夹或者文件无权限,授予权限:sudo chmod 777 文件名 Python报错:PermissionError: [Errno 13] Permission denied解决方案详解

解决’/usr/local/lib/libffi.7.dylib’ (no such file) pip3 install --force-reinstall cryptography 如果没有权限,加sudo sudo pip3 install --force-reinstall cryptography



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3