1、paramiko自动登录网络设备抓取配置信息
# !/usr/bin/env python
# -*- coding:utf-8 -*-
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname='192.168.1.2', port=22, username='cisco', password='cisco')
client = ssh.invoke_shell()
def run_cmd(cmd, endswith): # 形参:cmd命令,结束符
buff = ''
client.send(cmd)
while not buff.endswith(endswith):
resp = str(client.recv(1024), 'utf-8')
buff += resp
return buff
res = ''
res += run_cmd('enable\n', 'Password: ')
res += run_cmd('cisco\n', '#')
res += run_cmd('terminal length 0\n', '#')
res += run_cmd('show run\n', '#')
print(res)
ssh.close()
# <H3C> screen-length disable # no screen-length disable (华三)
# Cisco# terminal length 0 # terminal no length (思科)
# MaiPu# more off # more on (迈普)
# <HuaWei> screen-length 0 temporary # undo screen-length temporary (华为)
# RuiJie# terminal length 0 # terminal no length (锐捷)
# ZTE# terminal length 0 # no terminal length (中兴)
# !/usr/bin/env python
# -*- coding:utf-8 -*-
import paramiko
from paramiko.ssh_exception import NoValidConnectionsError, AuthenticationException
def sshconnect(command, hostname):
"""
:param command: 格式必需为list
:param hostname: 单个主机IP地址
:return: 用户不存在或连接超时返回错误
"""
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(hostname=hostname, port=22, username='USERNAME', password='PASSWORD')
except (NoValidConnectionsError, TimeoutError, AuthenticationException) as e:
return {'status': False, 'ip': hostname}
else:
client = ssh.invoke_shell()
for i in range(0, len(command)):
buff = ''
client.send(command[i] + '\n')
while not buff.endswith('#'):
resp = str(client.recv(1024), 'utf-8')
buff += resp
ssh.close()
if __name__ == '__main__':
sshconnect('['show version','show run']', '192.168.1.1')
# !/usr/bin/env python
# -*- coding:utf-8 -*-
import paramiko
from .conndb import condb
class comupload(object):
def __init__(self, hostname, project=None, username='root', port=22, password='123456'):
# self.private_key = paramiko.RSAKey.from_private_key_file('/root/.ssh/id_rsa')
self.hostname = hostname
self.username = username
self.port = port
self.project = project
self.password = password
self.transport = paramiko.Transport((self.hostname, self.port))
# self.transport.connect(username=self.username, pkey=self.private_key)
self.transport.connect(username=self.username, password=self.password)
self.sftp = paramiko.SFTPClient.from_transport(self.transport)
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 允许连接know_hosts文件里不存在的主机
# self.client.connect(hostname=self.hostname, port=self.port, username=self.username, pkey=self.private_key)
self.client.connect(hostname=self.hostname, port=self.port, username=self.username, password=self.password)
def upload(self, local_path, remote_path):
# 将文件上传至服务器
self.sftp.put(local_path, remote_path, callback=self.__callback)
# if self.project:
# self.sftp.put(local_path, remote_path, callback=self.__callback)
# else:
# self.sftp.put(local_path, remote_path)
def download(self, remotepath, localpath):
# 将文件下载到本地
self.sftp.get(remotepath, localpath, callback=self.__callback)
def comand(self, com):
# 执行命令,返回命令结果
stdin, stdout, stderr = self.client.exec_command(com)
result = stdout.read().decode()
reserr = stderr.read().decode()
return result, reserr
def exec_com(self, com):
# 执行命令,返回命令结果和状态码
self.channel = self.client.get_transport().open_session()
self.channel.exec_command(com)
stdout = self.channel.makefile().read()
stderr = self.channel.makefile_stderr().read()
exit_code = self.channel.recv_exit_status()
self.channel.close()
return stdout, stderr, exit_code
def __callback(self, send, total):
'''
需求: 制作传输进度条
1. CallBack方法获取总字节数和已传输字节数
2. 通过计算获取已传输字节数占总字节数的百分比
3. 制作进度条
:param send:
:param total:
:return:
'''
end = '' if send != total else '\n'
# 上传进度条
print('\r |{:<50}| {:.2f} M [{:.2f}%]'.format('#' * int(send * 100 / total / 2), send / 1024 / 1024, send * 100 / total), end=end, flush=True)
# sendsize = send / 1024 / 1024
# percent = send * 100 / total
# condb(
# 'UPDATE `project` SET `sendsize`="{}",`percent`="{}" WHERE `name`="{}";'.format(sendsize, percent,self.project))
def sshclose(self):
# 关闭连接
self.sftp.close()
self.client.close()
if __name__ == '__main__':
ssh_sftp = comupload('172.17.0.5')
ssh_sftp.upload('test.tar.gz', '/root/test.tar.gz')
r, re = ssh_sftp.comand('echo hello')
res, reserr, exit_code = ssh_sftp.exec_com('echo helloworld')
ssh_sftp.sshclose()
#!/usr/bin/python3.8
# -*- coding:UTF-8 -*-
# pip install func-timeout
# 执行命令返回命令结果
import re
import time
from paramiko import SSHClient, AutoAddPolicy
from func_timeout import func_set_timeout, exceptions
class RemoteCMD():
"""执行远程命令
"""
def _init_connection(self,
ip: str,
port: int,
user: str,
passwd: str,
timeout: int = 5):
"""初始化连接(这里单独抽出可以避免 func 超时未关闭 session)
Args:
ip (str): IP 地址
port (int): SSH 端口
user (str): 用户名
passwd (str): 密码
timeout (int, optional): 超时时长(秒). Defaults to 5.
Returns:
paramiko object: paramiko 客户端对象
"""
client = SSHClient()
# client.load_system_host_keys()
client.set_missing_host_key_policy(AutoAddPolicy())
client.connect(hostname=ip,
port=int(port),
username=user,
password=passwd,
timeout=timeout,
allow_agent=False,
look_for_keys=False)
client = client.invoke_shell()
return client
def _match_prompt(self, content: str, regex: str):
"""结束提示符匹配
Args:
content (str): 需要匹配的字符串
regex (str): 需要匹配的关键词或正则表达式
Returns:
bool: True/False
"""
if re.search(regex, content):
return True
return False
@func_set_timeout(5)
def _send_cmd(self,
client: object,
cmd: str,
recv_end_prompt: str,
recv_size: int = 512):
"""发送命令、拉取结果
Args:
client (object): paramiko 客户端对象
cmd (str): 要执行的命令
recv_size (int, optional): 单次拉取结果数据的大小. Defaults to 512.
Returns:
string: 执行结果
"""
client.send(f"{str(cmd).strip()}\n")
# 等待就绪
while not client.recv_ready():
time.sleep(0.2)
result = ""
while not self._match_prompt(result, recv_end_prompt):
result += client.recv(recv_size).decode()
return result
def command(self,
ip: str,
port: int,
user: str,
passwd: str,
cmd: str,
end_prompt: str = r"#\s$",
timeout: int = 5):
"""执行远程命令
Args:
ip (str): IP 地址
port (int): SSH 端口
user (port): 用户名
passwd (str): 密码
cmd (str): 要执行的命令
timeout (int, optional): 超时时长(秒). Defaults to 5.
Returns:
tuple: (True/False, 执行结果或报错)
"""
client = None
try:
client = self._init_connection(ip, port, user, passwd, timeout)
except Exception as err: # pylint: disable=broad-except
return False, f"SSH 连接出现异常:{err}"
try:
return True, self._send_cmd(client, cmd, end_prompt)
except exceptions.FunctionTimedOut:
return False, "SSH 执行出现超时错误!"
except Exception as err: # pylint: disable=broad-except
return False, f"SSH 执行出现其他异常:{err}"
finally:
if client:
client.close()
if __name__ == '__main__':
sshclient=RemoteCMD()
print(sshclient.command('172.18.188.5',22,'root','123456','touch test.txt'))
2、scapy模块(网络扫描模块)
pip install scapy #安装
from scapy.all import * #导入
ls():显示支持所有协议
lsc():显示支持所有命令(就是发送、接受数据包函数)
help(协议|命令):显示协议、命令详细帮助
注意:
协议、命令不加单引号或者双引号,help(SNMP)、help(tcpdump)
[root@localhost ~]# git clone https://github.com/secdev/scapy.git
[root@localhost ~]# cd scapy/
[root@localhost ~]# ./run_scapy
相关链接:
https://github.com/tecstack/forward https://scapy.net
#requests请求
#paramiko模块使用
https://github.com/fgimian/paramiko-expect/blob/master/examples/paramiko_expect-demo.py # paramiko_expect模块
https://blog.51cto.com/lizhenliang/1880856 # paramiko、fabric与pexpect