#!/usr/bin/env python3
"""
magnet_dl.py - 命令行 Magnet/Torrent 下载工具(aria2c 包装)
支持选择性下载种子中的部分文件
用法:
magnet_dl "magnet:?xt=urn:btih:xxxxx"
magnet_dl "magnet:?xt=urn:btih:xxxxx" -d ./downloads
magnet_dl "magnet:?xt=urn:btih:xxxxx" -s 1,3,5
magnet_dl "magnet:?xt=urn:btih:xxxxx" --list
magnet_dl file.torrent -s 1-3,5
"""
import subprocess, sys, os, argparse, tempfile, glob, re
ARIA2C = os.path.join(os.path.dirname(os.path.abspath(__file__)), "aria2c.exe")
if not os.path.exists(ARIA2C):
ARIA2C = "aria2c"
TRACKERS = ",".join([
"udp://tracker.opentrackr.org:1337/announce",
"udp://open.stealth.si:80/announce",
"udp://tracker.torrent.eu.org:451/announce",
"udp://public.popcorn-tracker.org:6969/announce",
"udp://tracker.dler.org:6969/announce",
"udp://exodus.desync.com:6969/announce",
"udp://open.demonii.com:1337/announce",
"udp://tracker.openbittorrent.com:6969/announce",
"udp://tracker.moeking.me:6969/announce",
"udp://tracker.bittor.pw:1337/announce",
"udp://opentracker.i2p.rocks:6969/announce",
"udp://tracker.tiny-vps.com:6969/announce",
"udp://p4p.arenabg.com:1337/announce",
"http://tracker.openbittorrent.com:80/announce",
"https://opentracker.i2p.rocks:443/announce",
])
COMMON_ARGS = [
"--enable-dht=true",
"--enable-peer-exchange=true",
"--dht-listen-port=6881",
"--listen-port=6881",
"--bt-enable-lpd=true",
"--seed-time=0",
"--max-connection-per-server=16",
"--split=16",
"--min-split-size=1M",
"--bt-max-peers=100",
"--bt-request-peer-speed-limit=10M",
f"--bt-tracker={TRACKERS}",
"--dht-entry-point=router.bittorrent.com:6881",
"--dht-entry-point=router.utorrent.com:6881",
"--dht-entry-point=dht.transmissionbt.com:6881",
"--summary-interval=5",
"--file-allocation=falloc",
"--check-integrity=true",
"--bt-remove-unselected-file=true",
]
def is_magnet(s):
return s.startswith("magnet:")
def get_torrent_from_magnet(magnet, tmpdir):
print("[*] 获取元数据中 (加载了15个tracker + DHT)...")
cmd = [ARIA2C, "--bt-metadata-only=true", "--bt-save-metadata=true",
f"--dir={tmpdir}", *COMMON_ARGS, magnet]
try:
subprocess.run(cmd, timeout=120)
except subprocess.TimeoutExpired:
print("[!] 元数据获取超时(120s)")
except KeyboardInterrupt:
print("[!] 用户中断")
torrents = glob.glob(os.path.join(tmpdir, "*.torrent"))
return torrents[0] if torrents else None
def show_files(torrent_path):
r = subprocess.run([ARIA2C, "--show-files", torrent_path], capture_output=True, text=True, encoding='utf-8', errors='replace')
print(r.stdout)
return r.stdout
def parse_selection(s, max_n):
sel = set()
for part in s.split(','):
part = part.strip()
if not part: continue
if '-' in part:
a, b = part.split('-', 1)
sel.update(range(int(a), int(b) + 1))
else:
sel.add(int(part))
return sorted(x for x in sel if 1 <= x <= max_n)
def download(target, save_path, select=None):
cmd = [ARIA2C, f"--dir={save_path}", *COMMON_ARGS]
if select:
cmd.append(f"--select-file={','.join(str(x) for x in select)}")
cmd.append(target)
print(f"\n[*] 下载到: {save_path}")
if select: print(f"[*] 已选文件: {select}")
print("=" * 60)
try:
subprocess.run(cmd)
except KeyboardInterrupt:
print("\n[!] 用户中断")
def main():
ap = argparse.ArgumentParser(description='命令行 Magnet/Torrent 下载 (aria2c)')
ap.add_argument('target', help='Magnet 链接或 .torrent 文件')
ap.add_argument('-d', '--dir', default='.', help='下载目录 (默认.)')
ap.add_argument('-s', '--select', default='', help='文件编号: 1,3,5 或 1-3,5')
ap.add_argument('--list', action='store_true', help='仅列出文件')
ap.add_argument('-i', '--interactive', action='store_true', help='交互选择文件')
args = ap.parse_args()
save_path = os.path.abspath(args.dir)
os.makedirs(save_path, exist_ok=True)
target = args.target
# magnet + 需要文件信息 → 先获取元数据
if is_magnet(target) and (args.list or args.interactive or args.select):
tmpdir = tempfile.mkdtemp(prefix="magnet_dl_")
torrent = get_torrent_from_magnet(target, tmpdir)
if not torrent:
print("[!] 无法获取元数据" + ("" if args.list else ",尝试直接下载..."))
if not args.list: download(target, save_path)
return
print(f"[OK] 元数据: {torrent}")
target = torrent
if args.list or args.interactive:
output = show_files(target)
if args.list: return
indices = re.findall(r'^\s*(\d+)\|', output, re.MULTILINE)
max_n = max(int(x) for x in indices) if indices else 0
if max_n > 0:
print("输入编号 (1-3,5,7) 回车=全部 q=退出")
try:
c = input("> ").strip()
except (EOFError, KeyboardInterrupt):
return
if c.lower() == 'q': return
if c:
download(target, save_path, parse_selection(c, max_n))
return
download(target, save_path)
return
if args.select:
download(target, save_path, parse_selection(args.select, 9999))
elif is_magnet(target):
download(target, save_path)
else:
download(target, save_path)
if __name__ == '__main__':
main()
评论(0)
登录 后可发表评论。
暂无评论。