#!/usr/bin/env python3 """wallets.json 图形管理(PySide6)。默认同目录下 servers/walletman/cmd/wallets.json。""" from __future__ import annotations import json import sys from pathlib import Path from typing import Any from PySide6.QtCore import Qt from PySide6.QtGui import QAction, QCloseEvent, QKeySequence from PySide6.QtWidgets import ( QApplication, QComboBox, QDialog, QDialogButtonBox, QFileDialog, QFormLayout, QHBoxLayout, QHeaderView, QLabel, QLineEdit, QMainWindow, QMessageBox, QPushButton, QTableWidget, QTableWidgetItem, QTextEdit, QVBoxLayout, QWidget, ) # 与 servers/walletman/cmd/tools/main.go 中 walletTypeMenu 一致 WALLET_TYPES = [ "paytm", "phonepe", "mobikwik", "freecharge", "bharatpe business", "paytm business", "phonepe business", "googlepay business", ] def default_wallets_path() -> Path: root = Path(__file__).resolve().parent for rel in ( root / "servers" / "walletman" / "cmd" / "wallets.json", root / "wallets.json", ): if rel.is_file(): return rel return root / "servers" / "walletman" / "cmd" / "wallets.json" def parse_params(s: str) -> dict[str, Any]: s = s.strip() if not s: raise ValueError("params 为空") return json.loads(s) def format_params_pretty(s: str) -> str: s = s.strip() if not s: return "" return json.dumps(json.loads(s), ensure_ascii=False, indent=2) def compact_params_json(s: str) -> str: return json.dumps(json.loads(s), ensure_ascii=False, separators=(",", ":")) def suggest_wallet_id(wallet_type: str, params: str) -> str: try: p = json.loads(params) except json.JSONDecodeError: return "" mob = p.get("mobile") if isinstance(mob, str) and mob: return f"{wallet_type}_{mob}" return "" class WalletEditDialog(QDialog): def __init__( self, parent: QWidget | None, title: str, wallet_type: str, wallet_id: str, params: str, allow_change_id: bool = True, *, is_add: bool = False, ) -> None: super().__init__(parent) self.setWindowTitle(title) self.setMinimumSize(640, 480) self._is_add = is_add self._id_locked = not allow_change_id and bool(wallet_id) layout = QVBoxLayout(self) form = QFormLayout() self.edit_uid: QLineEdit | None = None if is_add: self.edit_uid = QLineEdit("10000") self.edit_uid.setPlaceholderText("如 10000") form.addRow("用户", self.edit_uid) self.combo_type = QComboBox() self.combo_type.setEditable(True) for t in WALLET_TYPES: self.combo_type.addItem(t) idx = self.combo_type.findText(wallet_type, Qt.MatchFlag.MatchFixedString) if idx >= 0: self.combo_type.setCurrentIndex(idx) else: self.combo_type.setCurrentText(wallet_type) self.edit_id = QLineEdit(wallet_id) self.edit_id.setPlaceholderText("留空则根据 params 里 mobile 自动生成") if self._id_locked: self.edit_id.setReadOnly(True) form.addRow("类型", self.combo_type) form.addRow("钱包 key", self.edit_id) layout.addLayout(form) self.text_params = QTextEdit() self.text_params.setFontFamily("Menlo" if sys.platform == "darwin" else "Consolas") self.text_params.setPlainText(params) layout.addWidget(QLabel("params (JSON)")) layout.addWidget(self.text_params, 1) box = QDialogButtonBox( QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel ) box.accepted.connect(self._accept) box.rejected.connect(self.reject) layout.addWidget(box) def _accept(self) -> None: ptxt = self.text_params.toPlainText().strip() try: parse_params(ptxt) except (json.JSONDecodeError, ValueError) as e: QMessageBox.warning(self, "JSON 无效", str(e)) return self._ok = True self.accept() @property def user_id(self) -> str: if not self._is_add or self.edit_uid is None: return "10000" return self.edit_uid.text().strip() or "10000" return self.combo_type.currentText().strip() @property def wallet_id(self) -> str: return self.edit_id.text().strip() @property def params_compact(self) -> str: return compact_params_json(self.text_params.toPlainText()) class WalletsWindow(QMainWindow): COL_USER, COL_ID, COL_TYPE, COL_PREVIEW = 0, 1, 2, 3 def __init__(self) -> None: super().__init__() self.setWindowTitle("Wallets 管理器") self.resize(1100, 600) self._path: Path = default_wallets_path() self._data: dict[str, dict[str, dict[str, str]]] = {} self._dirty = False central = QWidget() self.setCentralWidget(central) v = QVBoxLayout(central) path_row = QHBoxLayout() self.label_path = QLabel() path_row.addWidget(self.label_path) btn_browse = QPushButton("打开…") btn_browse.clicked.connect(self._open_file) path_row.addWidget(btn_browse) v.addLayout(path_row) self.table = QTableWidget(0, 4) self.table.setHorizontalHeaderLabels(["用户", "Wallet ID", "类型", "params 摘要"]) self.table.horizontalHeader().setSectionResizeMode(3, QHeaderView.ResizeMode.Stretch) self.table.setSelectionBehavior(QTableWidget.SelectionBehavior.SelectRows) self.table.setEditTriggers(QTableWidget.EditTrigger.NoEditTriggers) self.table.doubleClicked.connect(self._on_double_click) v.addWidget(self.table) row = QHBoxLayout() for text, fn in [ ("重新加载", self._reload), ("保存", self._save), ("添加", self._add), ("编辑", self._edit), ("删除", self._delete), ]: b = QPushButton(text) b.clicked.connect(fn) row.addWidget(b) v.addLayout(row) self._update_path_label() self._build_menu() self.statusBar().showMessage("就绪") self._load_file(self._path, quiet=True) def _build_menu(self) -> None: m = self.menuBar().addMenu("文件") a_open = QAction("打开…", self) a_open.setShortcut(QKeySequence.StandardKey.Open) a_open.triggered.connect(self._open_file) m.addAction(a_open) a_save = QAction("保存", self) a_save.setShortcut(QKeySequence.StandardKey.Save) a_save.triggered.connect(self._save) m.addAction(a_save) m.addSeparator() a_exit = QAction("退出", self) a_exit.setShortcut(QKeySequence.StandardKey.Quit) a_exit.triggered.connect(self.close) m.addAction(a_exit) def _update_path_label(self) -> None: self.label_path.setText(f"文件: {self._path} " + ("● 未保存" if self._dirty else "")) self.label_path.setTextInteractionFlags( Qt.TextInteractionFlag.TextSelectableByMouse ) def _set_dirty(self, d: bool) -> None: self._dirty = d self._update_path_label() def _load_file(self, path: Path, quiet: bool = False) -> None: if not path.is_file(): self._data = {} self._rebuild_table() self._set_dirty(False) if not quiet: QMessageBox.information(self, "提示", f"文件不存在,将使用空数据:\n{path}") return try: raw = path.read_text(encoding="utf-8") self._data = json.loads(raw) except (OSError, json.JSONDecodeError) as e: QMessageBox.critical(self, "读取失败", str(e)) return self._path = path.resolve() self._rebuild_table() self._set_dirty(False) self.setWindowTitle(f"Wallets 管理器 — {self._path.name}") def _reload(self) -> None: if self._dirty: r = QMessageBox.question( self, "未保存", "是否丢弃未保存的修改?", QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No, ) if r != QMessageBox.StandardButton.Yes: return self._load_file(self._path, quiet=True) def _save(self) -> None: self._path.parent.mkdir(parents=True, exist_ok=True) try: text = json.dumps(self._data, ensure_ascii=False, indent=2) + "\n" self._path.write_text(text, encoding="utf-8") except OSError as e: QMessageBox.critical(self, "保存失败", str(e)) return self._set_dirty(False) QMessageBox.information(self, "已保存", str(self._path)) def _open_file(self) -> None: if self._dirty: r = QMessageBox.question( self, "未保存", "是否先保存?", QMessageBox.StandardButton.Save | QMessageBox.StandardButton.Discard | QMessageBox.StandardButton.Cancel, ) if r == QMessageBox.StandardButton.Save: self._save() elif r == QMessageBox.StandardButton.Cancel: return path, _ = QFileDialog.getOpenFileName( self, "选择 wallets.json", str(self._path.parent), "JSON (*.json);;*.*" ) if path: self._load_file(Path(path)) def _rebuild_table(self) -> None: rows: list[tuple[str, str, str, str]] = [] for uid, wallets in self._data.items(): for wid, ent in wallets.items(): wt = ent.get("walletType", "") p = ent.get("params", "") prev = p[:80] + ("…" if len(p) > 80 else "") rows.append((str(uid), wid, str(wt), prev)) rows.sort(key=lambda x: (x[0], x[1])) self.table.setRowCount(len(rows)) for i, (uid, wid, wt, prev) in enumerate(rows): self.table.setItem(i, self.COL_USER, QTableWidgetItem(uid)) self.table.setItem(i, self.COL_ID, QTableWidgetItem(wid)) self.table.setItem(i, self.COL_TYPE, QTableWidgetItem(wt)) self.table.setItem(i, self.COL_PREVIEW, QTableWidgetItem(prev)) def _selected_cell(self) -> tuple[int, int] | None: idx = self.table.currentIndex() if not idx.isValid(): return None return idx.row(), idx.column() def _row_to_keys(self, row: int) -> tuple[str, str] | None: u = self.table.item(row, self.COL_USER) w = self.table.item(row, self.COL_ID) if not u or not w: return None return u.text(), w.text() def _on_double_click(self) -> None: self._edit() def _add(self) -> None: default_type = WALLET_TYPES[0] dlg = WalletEditDialog(self, "添加钱包", default_type, "", "{}", allow_change_id=True, is_add=True) if dlg.exec() != QDialog.DialogCode.Accepted: return uid = dlg.user_id wt, pid, pcompact = dlg.wallet_type, dlg.wallet_id, dlg.params_compact if not pid: pid = suggest_wallet_id(wt, pcompact) if not pid: QMessageBox.warning(self, "错误", "无法生成 Wallet ID,请填写「钱包 key」或确保 params 含 mobile") return if uid not in self._data: self._data[uid] = {} existed = pid in self._data[uid] self._data[uid][pid] = {"walletType": wt, "params": pcompact} self._rebuild_table() self._set_dirty(True) if existed: self.statusBar().showMessage(f"已更新(原记录已存在): {uid} / {pid}", 5000) else: self.statusBar().showMessage(f"已添加: {uid} / {pid}", 5000) def _edit(self) -> None: sel = self._selected_cell() if not sel: QMessageBox.information(self, "提示", "先选中一行") return row = sel[0] keys = self._row_to_keys(row) if not keys: return uid, wid = keys ent = self._data.get(uid, {}).get(wid) if not ent: return wt = ent["walletType"] try: pretty = format_params_pretty(ent["params"]) except (json.JSONDecodeError, TypeError): pretty = str(ent.get("params", "")) dlg = WalletEditDialog(self, "编辑钱包", wt, wid, pretty, allow_change_id=False, is_add=False) if dlg.exec() != QDialog.DialogCode.Accepted: return nwt, _, pcompact = dlg.wallet_type, dlg.wallet_id, dlg.params_compact self._data[uid][wid] = {"walletType": nwt, "params": pcompact} self._rebuild_table() self._set_dirty(True) def _delete(self) -> None: sel = self._selected_cell() if not sel: return keys = self._row_to_keys(sel[0]) if not keys: return uid, wid = keys if ( QMessageBox.question( self, "删除", f"确定删除 {uid} / {wid} ?", ) != QMessageBox.StandardButton.Yes ): return self._data.get(uid, {}).pop(wid, None) if self._data.get(uid) == {}: self._data.pop(uid, None) self._rebuild_table() self._set_dirty(True) def closeEvent(self, event: QCloseEvent) -> None: if self._dirty: r = QMessageBox.question( self, "未保存", "是否保存后退出?", QMessageBox.StandardButton.Save | QMessageBox.StandardButton.Discard | QMessageBox.StandardButton.Cancel, ) if r == QMessageBox.StandardButton.Save: self._save() elif r == QMessageBox.StandardButton.Cancel: event.ignore() return event.accept() def main() -> None: app = QApplication(sys.argv) w = WalletsWindow() w.show() sys.exit(app.exec()) if __name__ == "__main__": main()