package usdtman import ( "context" "encoding/json" "fmt" "io" "net/http" "net/url" "strings" "sync" "time" ) // Config USDTMan 配置 type Config struct { Addresses []string // 监听地址列表 APIKey string // TronGrid API Key QueryInterval time.Duration // 查询间隔(秒),默认 5 秒 MinConfirmations int64 // 最小确认数,默认 6 MaxHistoryTxns int // 每次查询的最大历史交易数,默认 20 ProxyURL string // HTTP/SOCKS5 代理地址,例如 "http://127.0.0.1:7890" 或 "socks5://127.0.0.1:1080" Transport http.RoundTripper // 自定义 Transport(优先级高于 ProxyURL) } // USDTMan USDT 监听管理器 type USDTMan struct { mu sync.RWMutex addresses []string apiKey string queryInterval time.Duration running bool ctx context.Context cancel context.CancelFunc onPaymentComplete func(*USDTPayment) minConfirmations int64 maxHistoryTxns int processedTxns map[string]bool // 已处理的交易 txnMutex sync.RWMutex httpClient *http.Client // HTTP 客户端 } // USDTPayment USDT 收款信息 type USDTPayment struct { Address string Amount float64 TxID string BlockNumber int64 Timestamp int64 From string Confirmations int64 } // TronGridTransaction 交易信息 type TronGridTransaction struct { Ret []map[string]interface{} `json:"ret"` TxID string `json:"txID"` BlockNumber int64 `json:"blockNumber"` BlockTimeStamp int64 `json:"block_timestamp"` RawData struct { Contract []struct { Parameter struct { Value struct { Amount int64 `json:"amount"` To string `json:"to_address"` From string `json:"owner_address"` } `json:"value"` } `json:"parameter"` } `json:"contract"` } `json:"raw_data"` } // TronGridAccountTransactions 账户交易列表 type TronGridAccountTransactions struct { Success bool `json:"success"` Data []TronGridTransaction `json:"data"` Meta struct { PageSize int `json:"page_size"` Fingerprint string `json:"fingerprint"` } `json:"meta"` } // TronGridBlockInfo 区块信息 type TronGridBlockInfo struct { BlockHeader struct { RawData struct { Number int64 `json:"number"` } `json:"raw_data"` } `json:"block_header"` } const ( TronGridAPI = "https://api.trongrid.io" USDTContractAddress = "TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t" // USDT TRC20 ) // NewUSDTMan 创建监听管理器 func NewUSDTMan(config Config) *USDTMan { ctx, cancel := context.WithCancel(context.Background()) // 设置默认值 if config.QueryInterval == 0 { config.QueryInterval = 5 * time.Second } if config.MinConfirmations == 0 { config.MinConfirmations = 6 } if config.MaxHistoryTxns == 0 { config.MaxHistoryTxns = 20 } // 创建 HTTP 客户端 httpClient := &http.Client{ Timeout: 10 * time.Second, } // 配置代理 if config.Transport != nil { // 使用自定义 Transport httpClient.Transport = config.Transport } else if config.ProxyURL != "" { // 使用代理 URL proxyURL, err := url.Parse(config.ProxyURL) if err == nil { httpClient.Transport = &http.Transport{ Proxy: http.ProxyURL(proxyURL), } } else { fmt.Printf("⚠️ 代理 URL 解析失败: %v,使用直连\n", err) } } return &USDTMan{ addresses: config.Addresses, apiKey: config.APIKey, queryInterval: config.QueryInterval, minConfirmations: config.MinConfirmations, maxHistoryTxns: config.MaxHistoryTxns, httpClient: httpClient, ctx: ctx, cancel: cancel, processedTxns: make(map[string]bool), } } // OnPaymentComplete 设置收款回调 func (m *USDTMan) OnPaymentComplete(callback func(*USDTPayment)) { m.mu.Lock() defer m.mu.Unlock() m.onPaymentComplete = callback } // Start 开始监听 func (m *USDTMan) Start() error { m.mu.Lock() if m.running { m.mu.Unlock() return fmt.Errorf("monitor already running") } m.running = true m.mu.Unlock() go m.monitorLoop() return nil } // Stop 停止监听 func (m *USDTMan) Stop() { m.mu.Lock() if !m.running { m.mu.Unlock() return } m.running = false m.mu.Unlock() m.cancel() } // monitorLoop 监听循环 func (m *USDTMan) monitorLoop() { ticker := time.NewTicker(m.queryInterval) defer ticker.Stop() for { select { case <-m.ctx.Done(): return case <-ticker.C: m.checkAllAddresses() } } } // checkAllAddresses 检查所有地址 func (m *USDTMan) checkAllAddresses() { m.mu.RLock() addresses := make([]string, len(m.addresses)) copy(addresses, m.addresses) m.mu.RUnlock() for _, address := range addresses { m.checkAddress(address) } } // checkAddress 检查单个地址的交易 func (m *USDTMan) checkAddress(address string) { transactions, err := m.getUSDTTransactions(address) if err != nil { fmt.Printf("Error checking address %s: %v\n", address, err) return } currentBlock, err := m.getCurrentBlock() if err != nil { fmt.Printf("Error getting current block: %v\n", err) return } for _, txn := range transactions { // 检查是否已处理 m.txnMutex.RLock() processed := m.processedTxns[txn.TxID] m.txnMutex.RUnlock() if processed { continue } // 计算确认数 confirmations := currentBlock - txn.BlockNumber if confirmations < m.minConfirmations { continue } // 解析交易数据 payment := m.parseTransaction(&txn, address, confirmations) if payment == nil { continue } // 标记为已处理 m.txnMutex.Lock() m.processedTxns[txn.TxID] = true m.txnMutex.Unlock() // 触发回调 m.mu.RLock() callback := m.onPaymentComplete m.mu.RUnlock() if callback != nil { callback(payment) } } } // getUSDTTransactions 获取地址的 USDT 交易 func (m *USDTMan) getUSDTTransactions(address string) ([]TronGridTransaction, error) { url := fmt.Sprintf("%s/v1/accounts/%s/transactions/trc20?limit=%d&contract_address=%s&only_to=true", TronGridAPI, address, m.maxHistoryTxns, USDTContractAddress) req, err := http.NewRequest("GET", url, nil) if err != nil { return nil, err } if m.apiKey != "" { req.Header.Set("TRON-PRO-API-KEY", m.apiKey) } resp, err := m.httpClient.Do(req) if err != nil { return nil, err } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return nil, err } if resp.StatusCode != 200 { return nil, fmt.Errorf("TronGrid API error: %d %s", resp.StatusCode, string(body)) } var result TronGridAccountTransactions if err := json.Unmarshal(body, &result); err != nil { return nil, err } return result.Data, nil } // getCurrentBlock 获取当前区块高度 func (m *USDTMan) getCurrentBlock() (int64, error) { url := fmt.Sprintf("%s/wallet/getnowblock", TronGridAPI) req, err := http.NewRequest("GET", url, nil) if err != nil { return 0, err } if m.apiKey != "" { req.Header.Set("TRON-PRO-API-KEY", m.apiKey) } resp, err := m.httpClient.Do(req) if err != nil { return 0, err } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return 0, err } if resp.StatusCode != 200 { return 0, fmt.Errorf("TronGrid API error: %d", resp.StatusCode) } var blockInfo TronGridBlockInfo if err := json.Unmarshal(body, &blockInfo); err != nil { return 0, err } return blockInfo.BlockHeader.RawData.Number, nil } // parseTransaction 解析交易 func (m *USDTMan) parseTransaction(txn *TronGridTransaction, targetAddress string, confirmations int64) *USDTPayment { if len(txn.Ret) == 0 || txn.Ret[0]["contractRet"] != "SUCCESS" { return nil } if len(txn.RawData.Contract) == 0 { return nil } contract := txn.RawData.Contract[0] value := contract.Parameter.Value // 转换地址格式 to := value.To from := value.From // 检查是否是目标地址 if !strings.EqualFold(to, targetAddress) { return nil } // USDT 是 6 位小数 amount := float64(value.Amount) / 1000000.0 return &USDTPayment{ Address: targetAddress, Amount: amount, TxID: txn.TxID, BlockNumber: txn.BlockNumber, Timestamp: txn.BlockTimeStamp, From: from, Confirmations: confirmations, } } // GetAddresses 获取所有监听地址 func (m *USDTMan) GetAddresses() []string { m.mu.RLock() defer m.mu.RUnlock() addresses := make([]string, len(m.addresses)) copy(addresses, m.addresses) return addresses } // AddAddress 添加监听地址 func (m *USDTMan) AddAddress(address string) { m.mu.Lock() defer m.mu.Unlock() // 检查是否已存在 for _, addr := range m.addresses { if addr == address { return } } m.addresses = append(m.addresses, address) } // RemoveAddress 移除监听地址 func (m *USDTMan) RemoveAddress(address string) { m.mu.Lock() defer m.mu.Unlock() for i, addr := range m.addresses { if addr == address { m.addresses = append(m.addresses[:i], m.addresses[i+1:]...) return } } }