"""
# ClawEmail 全功能邮件工具使用说明
## 概述
基于 `@clawemail/node-sdk` 逆向工程的 Python 邮件工具,支持 Claw (`@claw.163.com`) API 邮箱的完整操作。纯 Python 实现,仅依赖 `requests`。
## 前置条件
1. 拥有 Claw 邮箱账号(`xxx@claw.163.com`)
2. 获取邮箱的认证 URL(形如 `t1/xxxx`)
3. Python 3.8+,已安装 `requests`
## 初始化配置
```bash
# 将 claw_email_tool.py 放到任意目录,然后运行 setup
python3 claw_email_tool.py setup "t1/你的认证token"
# 验证配置
python3 claw_email_tool.py folders
```
配置保存在 `~/.claw_accounts.json`,支持多账号。
## 功能一览
| 命令 | 说明 | 示例 |
|------|------|------|
| `list` | 列出收件箱邮件 | `list --limit 10` |
| `read` | 读取邮件正文 | `read <mid>` |
| `info` | 查看邮件元信息 | `info <mid1> <mid2>` |
| `send` | 发送邮件(支持附件) | `send --to x@y.com --subj "Hi" --body "内容"` |
| `reply` | 回复邮件 | `reply <mid> "回复内容"` |
| `forward` | 转发邮件 | `forward <mid> --to x@y.com` |
| `search` | 搜索邮件 | `search "关键词" --limit 20` |
| `mark` | 标记已读/未读/星标 | `mark <mid> read` |
| `move` | 移动邮件到指定文件夹 | `move <mid> <fid>` |
| `attach` | 下载邮件附件 | `attach <mid> <part_id>` |
| `folders` | 列出所有文件夹 | `folders` |
| `watch` | 监听新邮件(阻塞) | `watch --folder 1 --limit 5` |
## 详细用法
### 列出邮件
```bash
python3 claw_email_tool.py list --limit 5
# 输出: ● [mid] 日期 | 发件人 | 主题 (● = 未读)
```
### 读取邮件
```bash
python3 claw_email_tool.py read 54:1tbiNht5q2n3DHt4NwAA3Z
# 输出: From / To / Date / Subject / 正文内容
```
### 发送邮件(无附件)
```bash
python3 claw_email_tool.py send \
--to "someone@example.com" \
--subj "测试邮件" \
--body "这是一封测试邮件的内容"
```
### 发送邮件(带附件)
```bash
python3 claw_email_tool.py send \
--to "someone@example.com" \
--subj "带附件的邮件" \
--body "请查收附件" \
--attach /path/to/file.pdf /path/to/image.png
```
支持多个附件,空格分隔。内部流程:upload:prepare → upload:directData(multipart/form-data) → compose(deliver)。
### 回复邮件
```bash
python3 claw_email_tool.py reply <mid> "你的回复内容"
```
### 转发邮件
```bash
# 引用转发(包含原文)
python3 claw_email_tool.py forward <mid> --to "someone@example.com" --mode quote --body "转发说明"
# 直接转发
python3 claw_email_tool.py forward <mid> --to "someone@example.com" --mode direct
```
### 搜索邮件
```bash
python3 claw_email_tool.py search "关键词" --limit 20
```
### 标记邮件
```bash
python3 claw_email_tool.py mark <mid> read # 标记已读
python3 claw_email_tool.py mark <mid> unread # 标记未读
python3 claw_email_tool.py mark <mid> star # 星标
python3 claw_email_tool.py mark <mid> unstar # 取消星标
```
### 移动邮件
```bash
# fid 对应关系:1=收件箱, 2=草稿箱, 3=已发送, 4=已删除, 5=垃圾邮件
python3 claw_email_tool.py move <mid> 4 # 移到已删除
```
### 下载附件
```bash
python3 claw_email_tool.py attach <mid> <part_id>
# 先用 read 查看邮件获取 part_id,然后下载
```
### 监听新邮件
```bash
python3 claw_email_tool.py watch --folder 1 --limit 5
# 阻塞运行,有新邮件时打印通知
```
### 查看文件夹
```bash
python3 claw_email_tool.py folders
# 输出: 收件箱(unread=3) / 草稿箱 / 已发送 / 已删除 / 垃圾邮件
```
## 在 Agent 中调用
```python
import sys
sys.path.insert(0, "/path/to/claw_email_dir")
mod = __import__("claw_email_tool")
acct = mod.pick_account()
# 列出邮件
mod.ws_cmd_list(acct, limit=5)
# 发送带附件邮件
mod.ws_cmd_send(acct, to="x@y.com", subj="Hi", body="内容", attachments=["/path/to/file"])
# 下载附件
mod.ws_cmd_attach(acct, mid, part_id)
```
## 魔改说明(vs 原版 SDK)
1. **附件下载**:原 SDK 用 POST+JSON,实际必须用 GET + binary stream,ws_proxy_stream() 已修复
2. **附件上传**:不是简单的 POST 二进制流,正确流程是 upload:prepare(获取attachmentId) -> upload:directData(multipart/form-data) -> compose(deliver) 引用 attachments:[{id, type:"upload"}]
3. **纯 Python**:无需 Node.js,仅依赖 requests
4. **WebSocket 通道**:通过 wss://claw.163.com/ws 发送 JSON-RPC 调用邮箱 API
"""
#!/usr/bin/env python3
"""
ClawEmail 全功能邮件工具 - 支持IMAP和WS(claw API)两种模式
全量支持SDK: list/read/send/reply/forward/search/mark/move/folders/attachments
用法:
python3 claw_email_tool.py setup "t1/shbPedxqL73U8SRHshGKaYxCfyK"
python3 claw_email_tool.py list [--limit N]
python3 claw_email_tool.py read <mid>
python3 claw_email_tool.py send "to@example.com" "主题" "正文"
python3 claw_email_tool.py reply <mid> "正文"
python3 claw_email_tool.py forward <mid> "to@example.com"
python3 claw_email_tool.py search "关键词"
python3 claw_email_tool.py mark <mid> read|unread
python3 claw_email_tool.py move <mid> <fid>
python3 claw_email_tool.py folders
python3 claw_email_tool.py attach <mid> <part>
python3 claw_email_tool.py watch [--interval N]
python3 claw_email_tool.py accounts
"""
import sys, os, json, time, re, base64, email as email_lib
from datetime import datetime
from pathlib import Path
SCRIPT_DIR = Path(__file__).parent
ACCOUNTS_FILE = SCRIPT_DIR / ".claw_accounts.json"
CLAW_API_BASE = "https://claw.163.com/claw-api-gateway"
CLAW_PROXY_URL = f"{CLAW_API_BASE}/api/coremail/proxy"
def load_json(p):
if not p.exists(): return {}
with open(p) as f: return json.load(f)
def save_json(p, d):
with open(p, "w") as f: json.dump(d, f, indent=2, ensure_ascii=False)
# ── WS Proxy ──
def ws_get_token(credential, uid):
import requests
r = requests.post(f"{CLAW_API_BASE}/open/v1/mail/auth/token",
json={"uid": uid},
headers={"Authorization": f"Bearer {credential}", "Content-Type": "application/json"},
timeout=15)
data = r.json()
if not data.get("success"):
raise RuntimeError(f"Auth failed: {data}")
result = data.get("result", {})
return result.get("accessToken") or result.get("access_token")
def ws_proxy(token, uid, func, payload=None):
import requests
r = requests.post(CLAW_PROXY_URL,
json=payload or {},
params={"uid": uid, "func": func},
headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
timeout=30)
data = r.json()
if not data.get("success") and data.get("code") != "S_OK":
raise RuntimeError(f"RPC {func} failed: {data}")
return data.get("result") or data.get("var")
def ws_proxy_raw(token, uid, func, payload=None):
import requests
r = requests.post(CLAW_PROXY_URL,
json=payload or {},
params={"uid": uid, "func": func},
headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
timeout=30)
data = r.json()
if not data.get("success") and data.get("code") != "S_OK":
raise RuntimeError(f"RPC {func} failed: {data}")
return data
def ws_proxy_stream(token, uid, func, payload=None):
"""HTTP GET + query params for binary stream downloads (attachments).
Mirrors SDK: GET /proxy?uid=xxx&func=xxx¶m1=val1&... with responseType stream."""
import requests
params = {"uid": uid, "func": func}
if payload:
params.update(payload)
r = requests.get(CLAW_PROXY_URL,
params=params,
headers={"Authorization": f"Bearer {token}"},
timeout=300,
stream=True)
if r.status_code == 202:
# Error response is JSON even on stream endpoint
try:
err = r.json()
except:
err = {"code": "UNKNOWN", "body": r.text[:500]}
raise RuntimeError(f"Stream RPC {func} failed: {err}")
r.raise_for_status()
return r
# ── Setup ──
def setup_from_auth_url(auth_url):
import requests
if not auth_url.startswith("http"):
auth_url = "https://pcsetting.mail.163.com/app/auth/index.html?tkn=" + auth_url
r = requests.post(f"{CLAW_API_BASE}/open/v1/mail/auth/sso",
json={"ssoURL": auth_url}, timeout=15)
data = r.json()
if data.get("code") != "S_OK":
print(f"[✗] Auth failed: {data}"); return False
accts = load_json(ACCOUNTS_FILE)
for usr in data.get("result", {}).get("userList", []):
addr = usr.get("mailAddress", "")
aid = usr.get("id", "")
acode = usr.get("authCode", "")
imap_h = next((h["setting"] for h in usr.get("hostSettingList", []) if h.get("settingName") == "imap"), "")
imap_p = int(next((h["portSetting"] for h in usr.get("hostSettingList", []) if h.get("settingName") == "imap"), 993))
smtp_h = next((h["setting"] for h in usr.get("hostSettingList", []) if h.get("settingName") == "smtp"), "")
if acode:
accts[addr] = {
"email": addr, "credential": acode, "transport": "imap",
"imap_host": imap_h, "imap_port": imap_p, "smtp_host": smtp_h, "smtp_port": 465,
"added": datetime.now().isoformat()
}
print(f"[✓] {addr} -> IMAP mode (authCode)")
elif "__apikey__" in usr:
accts[addr] = {
"email": addr, "uid": aid, "credential": usr["__apikey__"], "transport": "ws",
"imap_host": imap_h, "imap_port": imap_p, "smtp_host": smtp_h, "smtp_port": 465,
"added": datetime.now().isoformat()
}
print(f"[✓] {addr} -> WS mode (apikey)")
save_json(ACCOUNTS_FILE, accts)
print(f"[✓] Saved to {ACCOUNTS_FILE}")
return True
# ── IMAP ──
def imap_connect(acct):
import imaplib
m = imaplib.IMAP4_SSL(acct["imap_host"], acct.get("imap_port", 993))
m.login(acct["email"], acct["credential"])
return m
def imap_list(acct, limit=10):
import imaplib
m = imap_connect(acct)
try:
m.select("INBOX", readonly=True)
_, nums = m.search(None, "ALL")
ids = nums[0].split()[-limit:]
for mid in reversed(ids):
_, hdrs = m.fetch(mid, "(BODY.PEEK[HEADER.FIELDS (FROM SUBJECT DATE)])")
h = hdrs[0][1].decode(errors="replace").strip()
fm = re.search(r"From:\s*(.+)", h, re.I)
su = re.search(r"Subject:\s*(.+)", h, re.I)
dt = re.search(r"Date:\s*(.+)", h, re.I)
print(f"[{mid.decode()}] {dt.group(1).strip() if dt else ''} | {fm.group(1).strip() if fm else '?'} | {su.group(1).strip() if su else ''}")
finally:
m.logout()
def imap_read(acct, mid):
import imaplib
m = imap_connect(acct)
try:
m.select("INBOX", readonly=True)
_, parts = m.fetch(str(mid).encode(), "(RFC822)")
msg = email_lib.message_from_bytes(parts[0][1])
print(f"From: {msg.get('From','?')}")
print(f"To: {msg.get('To','?')}")
print(f"Subject: {msg.get('Subject','?')}")
print(f"Date: {msg.get('Date','?')}")
print("---")
body = ""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
body = part.get_payload(decode=True).decode(errors="replace"); break
else:
body = msg.get_payload(decode=True).decode(errors="replace")
print(body[:5000])
finally:
m.logout()
# ── WS Commands ──
def ws_cmd_folders(acct):
uid = acct.get("uid") or acct["email"]
token = ws_get_token(acct["credential"], uid)
folders = ws_proxy(token, uid, "mbox:getAllFolders", {"flush": True, "stats": True, "threads": False})
for f in (folders or []):
unread = f.get("stats", {}).get("unreadMessageCount", 0) if f.get("stats") else 0
print(f" [{f['id']}] {f['name']} (unread={unread})")
def ws_cmd_list(acct, limit=10, fid=1):
uid = acct.get("uid") or acct["email"]
token = ws_get_token(acct["credential"], uid)
data = ws_proxy(token, uid, "mbox:listMessages", {
"fid": fid, "order": "date", "desc": True, "start": 0, "limit": limit, "mode": 0, "filter": {}
})
if not data:
print(f"[*] Folder fid={fid} empty"); return
for m in data:
flags = m.get("flags", {})
unread_mark = "●" if not flags.get("read") else " "
print(f"{unread_mark} [{m['id']}] {m.get('receivedDate','')} | {m.get('from','?')} | {m.get('subject','')}")
def ws_cmd_read(acct, mid):
uid = acct.get("uid") or acct["email"]
token = ws_get_token(acct["credential"], uid)
result = ws_proxy(token, uid, "mbox:readMessage", {"id": mid, "mode": "text"})
if not result:
print("[✗] No content"); return
print(f"From: {result.get('from', '?')}")
print(f"To: {result.get('to', '?')}")
print(f"Subject: {result.get('subject', '?')}")
print(f"Date: {result.get('sentDate', '?')}")
print("---")
content = result.get("content", "")
if not content:
text_part = result.get("text", {})
if isinstance(text_part, dict):
content = text_part.get("content", "")
print(content[:8000])
def ws_upload_file(token, uid, compose_id, file_path):
"""上传附件 (对齐SDK: upload:prepare → multipart/form-data upload:directData)
Returns attachmentId (int)"""
import requests
fname = os.path.basename(file_path)
# Step 1: upload:prepare via JSON RPC
prep = ws_proxy(token, uid, "upload:prepare", {
"composeId": compose_id,
"fileName": fname,
"contentType": "application/octet-stream"
})
att_id = prep.get("attachmentId")
if att_id is None:
raise RuntimeError(f"upload:prepare failed: {prep}")
# Step 2: multipart/form-data POST to upload:directData
with open(file_path, "rb") as f:
r = requests.post(CLAW_PROXY_URL,
params={"uid": uid, "func": "upload:directData", "composeId": compose_id, "attachmentId": str(att_id)},
files={"file": (fname, f, "application/octet-stream")},
headers={"Authorization": f"Bearer {token}"},
timeout=300)
data = r.json()
code = data.get("code")
if code != "S_OK":
raise RuntimeError(f"upload:directData failed: {data}")
return att_id
def ws_cmd_send(acct, to, subj, body, cc=None, attachments=None):
uid = acct.get("uid") or acct["email"]
token = ws_get_token(acct["credential"], uid)
attrs = {"to": [{"addr": to}], "subject": subj, "content": body, "isHtml": False, "priority": 3, "saveSentCopy": True}
if cc:
attrs["cc"] = [{"addr": a.strip()} for a in cc.split(",")]
# Step 1: compose(continue) → get composeId
compose_id = ws_proxy(token, uid, "mbox:compose", {"action": "continue", "attrs": attrs})
if isinstance(compose_id, dict):
compose_id = compose_id.get("id") or compose_id.get("var") or str(compose_id)
compose_id = str(compose_id)
# Step 2: upload attachments if any
if attachments:
uploaded_refs = []
for i, fp in enumerate(attachments):
if os.path.exists(fp):
att_id = ws_upload_file(token, uid, compose_id, fp)
uploaded_refs.append({"id": i, "type": "upload"})
fsize = os.path.getsize(fp)
print(f" 📎 Uploaded: {os.path.basename(fp)} (att_id={att_id}, {fsize} bytes)")
else:
print(f" [!] File not found: {fp}")
if uploaded_refs:
attrs["attachments"] = uploaded_refs
# Step 3: compose(deliver)
result = ws_proxy_raw(token, uid, "mbox:compose", {"id": compose_id, "action": "deliver", "attrs": attrs})
saved = result.get("savedSent", {})
mid = saved.get("mid", compose_id)
print(f"[✓] Sent to {to}, mid={mid}")
def ws_cmd_reply(acct, mid, body, to_all=False, cc=None, with_attachments=False):
uid = acct.get("uid") or acct["email"]
token = ws_get_token(acct["credential"], uid)
attrs = {"content": body, "isHtml": False, "saveSentCopy": True}
if cc:
attrs["cc"] = [{"addr": a.strip()} for a in cc.split(",")]
payload = {"id": mid, "toAll": to_all, "withAttachments": with_attachments, "action": "continue", "attrs": attrs}
compose_id = ws_proxy(token, uid, "mbox:replyMessage", payload)
payload["action"] = "deliver"
result = ws_proxy_raw(token, uid, "mbox:replyMessage", payload)
print(f"[✓] Replied to {mid}")
def ws_cmd_forward(acct, mid, to, mode="quote", body=""):
uid = acct.get("uid") or acct["email"]
token = ws_get_token(acct["credential"], uid)
payload = {
"ids": [mid], "mode": mode, "to": [{"addr": to}], "saveSentCopy": True,
}
if mode == "quote":
payload["action"] = "continue"
payload["attrs"] = {"content": body, "isHtml": False, "to": [{"addr": to}], "saveSentCopy": True}
ws_proxy(token, uid, "mbox:forwardMessages", payload)
payload["action"] = "deliver"
result = ws_proxy_raw(token, uid, "mbox:forwardMessages", payload)
else:
result = ws_proxy_raw(token, uid, "mbox:forwardMessages", payload)
print(f"[✓] Forwarded {mid} to {to} (mode={mode})")
def ws_cmd_search(acct, keyword, limit=20, field=None, from_addr=None, subject=None):
uid = acct.get("uid") or acct["email"]
token = ws_get_token(acct["credential"], uid)
conditions = []
if keyword:
conditions.append({"field": "subject", "operator": "startsWith", "value": keyword})
if from_addr:
conditions.append({"field": "from", "operator": "startsWith", "value": from_addr})
if subject:
conditions.append({"field": "subject", "operator": "startsWith", "value": subject})
data = ws_proxy(token, uid, "mbox:searchMessages", {
"conditions": conditions, "order": "date", "desc": True, "start": 0, "limit": limit
})
if not data:
print(f"[*] No results for '{keyword}'"); return
for m in data:
if isinstance(m, str):
print(f" [?] [{m}] (use 'info {m}' for details)")
else:
flags = m.get("flags", {})
unread_mark = "●" if not flags.get("read") else " "
print(f"{unread_mark} [{m['id']}] {m.get('receivedDate','')} | {m.get('from','?')} | {m.get('subject','')}")
def ws_cmd_mark(acct, mid, read_status):
uid = acct.get("uid") or acct["email"]
token = ws_get_token(acct["credential"], uid)
flag = read_status == "read"
ws_proxy(token, uid, "mbox:updateMessageInfos", {"ids": [mid], "attrs": {"flags": {"read": flag}}})
print(f"[✓] Marked {mid} as {read_status}")
def ws_cmd_move(acct, mid, fid):
uid = acct.get("uid") or acct["email"]
token = ws_get_token(acct["credential"], uid)
ws_proxy(token, uid, "mbox:updateMessageInfos", {"ids": [mid], "attrs": {"fid": int(fid)}})
print(f"[✓] Moved {mid} to folder {fid}")
def ws_cmd_info(acct, mids):
"""批量获取邮件详情"""
uid = acct.get("uid") or acct["email"]
token = ws_get_token(acct["credential"], uid)
data = ws_proxy(token, uid, "mbox:getMessageInfos", {"ids": mids})
if not data:
print("[✗] No data"); return
for m in (data if isinstance(data, list) else [data]):
flags = m.get("flags", {})
unread_mark = "●" if not flags.get("read") else " "
print(f"{unread_mark} [{m['id']}] {m.get('from','?')} | {m.get('subject','')} | {m.get('receivedDate','')}")
parts = m.get("parts") or []
for p in parts:
fname = p.get("filename", "")
ctype = p.get("contentType", "")
pnum = p.get("partNumber", "")
size = p.get("size", "")
if fname or ctype.startswith("application/"):
print(f" 📎 Part {pnum}: {fname} ({ctype}, {size} bytes)")
def ws_cmd_attach(acct, mid, part):
uid = acct.get("uid") or acct["email"]
token = ws_get_token(acct["credential"], uid)
# First get filename from readMessage
result = ws_proxy(token, uid, "mbox:readMessage", {"id": mid, "mode": "text"})
parts_info = result.get("parts") or result.get("attachments") or []
fname = None
for p in (parts_info if isinstance(parts_info, list) else []):
pid = str(p.get("id") or p.get("partId") or p.get("partNumber") or "")
if pid == str(part):
fname = p.get("filename")
break
if not fname:
fname = f"attachment_{part}"
# Download via GET stream (matches SDK: GET /proxy?func=mbox:getMessageData&mid=...&part=...&mode=download)
try:
resp = ws_proxy_stream(token, uid, "mbox:getMessageData", {"mid": mid, "part": part, "mode": "download"})
data = resp.content
except Exception as e:
print(f"[✗] Download failed: {e}"); return
if not data:
print("[✗] No attachment data"); return
# Try to extract filename from response headers
cd = resp.headers.get("content-disposition", "")
if "filename" in cd:
import urllib.parse
match = re.search(r'filename\*?=["\']?(?:UTF-8\'\')?([^"\';]+)', cd, re.I)
if match:
fname = urllib.parse.unquote(match.group(1))
out_path = SCRIPT_DIR / fname
with open(out_path, "wb") as f:
f.write(data)
print(f"[✓] Saved to {out_path} ({len(data)} bytes)")
def ws_cmd_watch(acct, interval=30, fid=1):
uid = acct.get("uid") or acct["email"]
print(f"[*] Watching {acct['email']} every {interval}s (Ctrl+C to stop)")
last_ids = set()
while True:
try:
token = ws_get_token(acct["credential"], uid)
data = ws_proxy(token, uid, "mbox:listMessages", {
"fid": fid, "order": "date", "desc": True, "start": 0, "limit": 5, "mode": 0, "filter": {}
})
if data:
current_ids = {m["id"] for m in data}
new_ids = current_ids - last_ids
if last_ids:
for m in data:
if m["id"] in new_ids:
print(f"[NEW] {m.get('receivedDate','')} | {m.get('from','?')} | {m.get('subject','')}")
last_ids = current_ids
time.sleep(interval)
except KeyboardInterrupt:
print("\n[*] Stopped"); break
except Exception as e:
print(f"[!] Error: {e}")
try: token = ws_get_token(acct["credential"], uid)
except: pass
time.sleep(interval)
# ── Router ──
def pick_account(email_addr=None):
accts = load_json(ACCOUNTS_FILE)
if not accts:
print("[✗] No accounts. Run setup first."); sys.exit(1)
if email_addr:
return accts.get(email_addr)
return list(accts.values())[0]
def dispatch(acct, cmd, args):
if acct.get("transport") == "imap":
if cmd == "list": imap_list(acct, int(args[1]) if len(args) > 1 else 10)
elif cmd == "read": imap_read(acct, args[1])
else: print("[!] IMAP mode: list/read only. Use WS mode for full features.")
else:
flags = {}
positional = []
for a in args:
if a.startswith("--"):
k, _, v = a[2:].partition("=")
flags[k] = v if v else True
else:
positional.append(a)
if cmd == "folders": ws_cmd_folders(acct)
elif cmd == "list": ws_cmd_list(acct, int(positional[0]) if len(positional) > 0 else 10, int(positional[1]) if len(positional) > 1 else 1)
elif cmd == "read": ws_cmd_read(acct, positional[0])
elif cmd == "info": ws_cmd_info(acct, positional)
elif cmd == "send": ws_cmd_send(acct, positional[0], positional[1], positional[2] if len(positional) > 2 else "",
cc=flags.get("cc"), attachments=positional[3:])
elif cmd == "reply": ws_cmd_reply(acct, positional[0], positional[1] if len(positional) > 1 else "",
to_all="--all" in flags, cc=flags.get("cc"), with_attachments="--attach" in flags)
elif cmd == "forward": ws_cmd_forward(acct, positional[0], positional[1], mode=flags.get("mode", "quote"),
body=flags.get("body", ""))
elif cmd == "search": ws_cmd_search(acct, positional[0] if positional else "",
limit=int(flags.get("limit", 20)), from_addr=flags.get("from"), subject=flags.get("subject"))
elif cmd == "mark": ws_cmd_mark(acct, positional[0], positional[1])
elif cmd == "move": ws_cmd_move(acct, positional[0], positional[1])
elif cmd == "attach": ws_cmd_attach(acct, positional[0], positional[1])
elif cmd == "watch": ws_cmd_watch(acct, int(positional[0]) if positional else 30)
else: print(f"[!] Unknown cmd: {cmd}")
def main():
args = sys.argv[1:]
if not args:
print("Usage: claw_email_tool.py <cmd> [args...]")
print("Commands: setup, accounts, folders, list, info, read, send, reply, forward, search, mark, move, attach, watch")
return
cmd = args[0]
if cmd == "setup":
setup_from_auth_url(args[1])
elif cmd == "accounts":
accts = load_json(ACCOUNTS_FILE)
for a in accts.values():
t = a.get("transport", "imap")
em = a['email']
print(f" {em} | {t} | IMAP:{a.get('imap_host','')} SMTP:{a.get('smtp_host','')}")
elif cmd in ("list", "info", "read", "send", "reply", "forward", "search", "mark", "move", "attach", "watch", "folders"):
acct = pick_account()
dispatch(acct, cmd, args[1:])
else:
print(f"Unknown: {cmd}")
if __name__ == "__main__":
main()
评论(0)
登录 后可发表评论。
暂无评论。