本文目录导读:

我来为您介绍几种Python实现端口扫描的方法,从简单到高级。
基础TCP连接扫描
import socket
import threading
from datetime import datetime
def scan_port(host, port):
"""扫描单个端口"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1) # 设置超时时间
result = sock.connect_ex((host, port))
sock.close()
if result == 0:
return True
return False
except Exception as e:
return False
def simple_scan(host, port_range):
"""简单端口扫描"""
print(f"开始扫描 {host}")
print(f"扫描范围: {port_range[0]} - {port_range[1]}")
print("-" * 50)
start_time = datetime.now()
open_ports = []
for port in range(port_range[0], port_range[1] + 1):
if scan_port(host, port):
print(f"端口 {port} 开放")
open_ports.append(port)
end_time = datetime.now()
print("-" * 50)
print(f"扫描完成,耗时: {end_time - start_time}")
print(f"发现 {len(open_ports)} 个开放端口: {open_ports}")
# 使用示例
if __name__ == "__main__":
# 扫描本地主机的前10个端口
simple_scan("127.0.0.1", (1, 10))
多线程端口扫描器
import socket
import threading
from queue import Queue
import sys
class PortScanner:
def __init__(self, host, start_port, end_port, threads=100):
self.host = host
self.start_port = start_port
self.end_port = end_port
self.threads = threads
self.open_ports = []
self.queue = Queue()
def scan_port(self, port):
"""线程工作函数"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(0.5)
result = sock.connect_ex((self.host, port))
sock.close()
if result == 0:
self.open_ports.append(port)
self.get_port_service(port)
except:
pass
def get_port_service(self, port):
"""获取端口服务名称"""
try:
service = socket.getservbyport(port)
except:
service = "未知服务"
print(f"端口 {port} 开放 - {service}")
def worker(self):
"""工作线程"""
while not self.queue.empty():
port = self.queue.get()
self.scan_port(port)
self.queue.task_done()
def run(self):
"""执行扫描"""
print(f"开始扫描 {self.host}")
print(f"扫描端口范围: {self.start_port} - {self.end_port}")
print(f"线程数: {self.threads}")
print("-" * 50)
# 创建端口队列
for port in range(self.start_port, self.end_port + 1):
self.queue.put(port)
# 启动线程
for _ in range(self.threads):
t = threading.Thread(target=self.worker)
t.daemon = True
t.start()
# 等待所有线程完成
self.queue.join()
print("-" * 50)
print(f"扫描完成! 发现 {len(self.open_ports)} 个开放端口")
# 使用示例
if __name__ == "__main__":
scanner = PortScanner("127.0.0.1", 1, 1000, threads=200)
scanner.run()
高级端口扫描器(带服务识别)
import socket
import argparse
import concurrent.futures
import ipaddress
from colorama import init, Fore, Style
init(autoreset=True)
class AdvancedPortScanner:
def __init__(self):
self.services = {
20: "FTP-DATA", 21: "FTP", 22: "SSH", 23: "Telnet",
25: "SMTP", 53: "DNS", 80: "HTTP", 110: "POP3",
143: "IMAP", 443: "HTTPS", 3306: "MySQL", 3389: "RDP",
5432: "PostgreSQL", 6379: "Redis", 8080: "HTTP-Proxy",
8443: "HTTPS-Alt"
}
self.common_ports = list(self.services.keys())
def scan_port(self, host, port, timeout=1):
"""扫描端口并尝试识别服务"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
if result == 0:
# 获取服务信息
service = self.get_service_banner(sock, port)
sock.close()
return port, True, service
sock.close()
return port, False, None
except Exception as e:
return port, False, str(e)
def get_service_banner(self, sock, port):
"""尝试获取服务banner"""
service_name = self.services.get(port, "Unknown")
# 尝试发送HTTP请求获取信息
if port in [80, 8080, 443, 8443]:
try:
sock.send(b"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n")
banner = sock.recv(1024)
return f"{service_name} ({banner[:50].decode(errors='ignore').strip()})"
except:
pass
# 尝试获取SSH版本信息
if port == 22:
try:
banner = sock.recv(1024)
return f"{service_name} ({banner[:50].decode(errors='ignore').strip()})"
except:
pass
return service_name
def scan_ports_concurrent(self, host, ports, max_workers=100):
"""并发扫描端口"""
open_ports = []
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# 创建扫描任务
future_to_port = {
executor.submit(self.scan_port, host, port): port
for port in ports
}
# 处理结果
for future in concurrent.futures.as_completed(future_to_port):
port = future_to_port[future]
try:
port_num, is_open, service = future.result()
if is_open:
open_ports.append((port_num, service))
print(f"{Fore.GREEN}[开放] 端口 {port_num}: {service}")
else:
print(f"{Fore.RED}[关闭] 端口 {port_num}", end="\r")
except Exception as e:
print(f"{Fore.YELLOW}[错误] 端口 {port}: {e}")
return sorted(open_ports)
def scan_target(self, target, port_range=None):
"""扫描目标"""
# 解析目标
try:
# 处理IP地址或域名
host = socket.gethostbyname(target)
print(f"{Fore.CYAN}目标解析: {target} -> {host}")
except:
print(f"{Fore.RED}无法解析目标: {target}")
return
# 确定扫描端口
if port_range:
ports = list(range(port_range[0], port_range[1] + 1))
else:
ports = self.common_ports
print(f"{Fore.CYAN}开始扫描 {host}")
print(f"{Fore.CYAN}扫描端口数: {len(ports)}")
print("=" * 60)
open_ports = self.scan_ports_concurrent(host, ports)
print("=" * 60)
print(f"{Fore.GREEN}扫描完成!")
print(f"{Fore.YELLOW}发现 {len(open_ports)} 个开放端口:")
for port, service in open_ports:
print(f" {Fore.WHITE}{port}/tcp {Fore.BLUE}{service}")
def main():
parser = argparse.ArgumentParser(description="高级端口扫描器")
parser.add_argument("target", help="扫描目标 (IP或域名)")
parser.add_argument("-p", "--ports", help="端口范围 (如: 1-1000)")
parser.add_argument("-t", "--threads", type=int, default=100, help="线程数 (默认: 100)")
args = parser.parse_args()
# 解析端口范围
port_range = None
if args.ports:
try:
start, end = map(int, args.ports.split("-"))
port_range = (start, end)
except:
print("端口格式错误, 使用常见端口")
scanner = AdvancedPortScanner()
scanner.scan_target(args.target, port_range)
if __name__ == "__main__":
main()
使用示例
基础使用:
# 扫描常见端口 python port_scanner.py 127.0.0.1 # 扫描指定端口范围 python port_scanner.py 192.168.1.1 -p 1-1000 # 并行扫描(增加速度) python port_scanner.py example.com -p 1-65535 -t 500
Python代码直接调用:
# 使用基础扫描器
from port_scanner import simple_scan
simple_scan("192.168.1.1", (1, 100))
# 使用高级扫描器
from port_scanner_advanced import AdvancedPortScanner
scanner = AdvancedPortScanner()
scanner.scan_target("scanme.nmap.org")
注意事项
- 合法性:只扫描自己拥有或授权的系统
- 限制速度:过快扫描可能被防火墙检测或触发安全警报
- 异常处理:网络连接可能不稳定,需要适当的超时和重试机制
- 性能优化:使用线程池或异步IO提高扫描效率
这些示例覆盖了从基础到高级的端口扫描实现,您可以根据实际需求选择合适的方案。