package usdtman import ( "context" "encoding/json" "fmt" "io" "math/big" "net/http" "net/url" "strings" "sync" "time" ) // Config USDTMan 配置 type Config struct { Addresses []string // 监听地址列表 APIKey string // TronGrid API Key QueryInterval time.Duration // 查询间隔(秒),默认 5 秒 MinConfirmations int64 // 最小确认数,默认 6 MaxHistoryTxns int // 每次查询的最大历史交易数,默认 20 ProxyURL string // HTTP/SOCKS5 代理地址,例如 "http://127.0.0.1:7890" 或 "socks5://127.0.0.1:1080" Transport http.RoundTripper // 自定义 Transport(优先级高于 ProxyURL) } // USDTMan USDT 监听管理器 type USDTMan struct { mu sync.RWMutex addresses []string apiKey string queryInterval time.Duration running bool ctx context.Context cancel context.CancelFunc onPaymentComplete func(*USDTPayment) minConfirmations int64 maxHistoryTxns int processedTxns map[string]bool // 已处理的交易 txnMutex sync.RWMutex httpClient *http.Client // HTTP 客户端 } // QueryFilter 查询过滤条件 type QueryFilter struct { Address string // 地址(必填) StartTime int64 // 开始时间戳(毫秒),0 表示不限制 EndTime int64 // 结束时间戳(毫秒),0 表示不限制 MinAmount *big.Int // 最小金额(micro USDT),nil 表示不限制 MinConfirmations int64 // 最小确认数,0 表示不限制 Limit int // 查询数量限制,0 使用默认值 } // USDTPayment USDT 收款信息 type USDTPayment struct { Address string Amount *big.Int // 原始金额(micro USDT,10^-6),例如 13260000 = 13.26 USDT TxID string BlockNumber int64 Timestamp int64 From string Confirmations int64 } // GetAmountFloat 获取浮点数金额(USDT) func (p *USDTPayment) GetAmountFloat() float64 { divisor := new(big.Float).SetInt64(1000000) amount := new(big.Float).SetInt(p.Amount) result := new(big.Float).Quo(amount, divisor) f, _ := result.Float64() return f } // GetAmountString 获取字符串金额(USDT,保留6位小数) func (p *USDTPayment) GetAmountString() string { return fmt.Sprintf("%.6f", p.GetAmountFloat()) } // MarshalJSON 自定义 JSON 序列化 func (p *USDTPayment) MarshalJSON() ([]byte, error) { return json.Marshal(struct { Address string `json:"Address"` Amount float64 `json:"Amount"` AmountRaw string `json:"AmountRaw"` TxID string `json:"TxID"` BlockNumber int64 `json:"BlockNumber"` Timestamp int64 `json:"Timestamp"` From string `json:"From"` Confirmations int64 `json:"Confirmations"` }{ Address: p.Address, Amount: p.GetAmountFloat(), AmountRaw: p.Amount.String(), TxID: p.TxID, BlockNumber: p.BlockNumber, Timestamp: p.Timestamp, From: p.From, Confirmations: p.Confirmations, }) } // TronGridTransaction TRC20 交易信息 type TronGridTransaction struct { TransactionID string `json:"transaction_id"` BlockTimestamp int64 `json:"block_timestamp"` From string `json:"from"` To string `json:"to"` Type string `json:"type"` Value string `json:"value"` TokenInfo struct { Symbol string `json:"symbol"` Address string `json:"address"` Decimals int `json:"decimals"` Name string `json:"name"` } `json:"token_info"` } // TronGridAccountTransactions 账户交易列表 type TronGridAccountTransactions struct { Success bool `json:"success"` Data []TronGridTransaction `json:"data"` Meta struct { At int64 `json:"at"` PageSize int `json:"page_size"` } `json:"meta"` } // TronGridBlockInfo 区块信息 type TronGridBlockInfo struct { BlockHeader struct { RawData struct { Number int64 `json:"number"` } `json:"raw_data"` } `json:"block_header"` } const ( TronGridAPI = "https://api.trongrid.io" USDTContractAddress = "TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t" // USDT TRC20 ) // NewUSDTMan 创建监听管理器 func NewUSDTMan(config Config) *USDTMan { ctx, cancel := context.WithCancel(context.Background()) // 设置默认值 if config.QueryInterval == 0 { config.QueryInterval = 5 * time.Second } if config.MinConfirmations == 0 { config.MinConfirmations = 6 } if config.MaxHistoryTxns == 0 { config.MaxHistoryTxns = 20 } // 创建 HTTP 客户端 httpClient := &http.Client{ Timeout: 10 * time.Second, } // 配置代理 if config.Transport != nil { // 使用自定义 Transport httpClient.Transport = config.Transport } else if config.ProxyURL != "" { // 使用代理 URL proxyURL, err := url.Parse(config.ProxyURL) if err == nil { httpClient.Transport = &http.Transport{ Proxy: http.ProxyURL(proxyURL), } } else { fmt.Printf("⚠️ 代理 URL 解析失败: %v,使用直连\n", err) } } return &USDTMan{ addresses: config.Addresses, apiKey: config.APIKey, queryInterval: config.QueryInterval, minConfirmations: config.MinConfirmations, maxHistoryTxns: config.MaxHistoryTxns, httpClient: httpClient, ctx: ctx, cancel: cancel, processedTxns: make(map[string]bool), } } // OnPaymentComplete 设置收款回调 func (m *USDTMan) OnPaymentComplete(callback func(*USDTPayment)) { m.mu.Lock() defer m.mu.Unlock() m.onPaymentComplete = callback } // Start 开始监听 func (m *USDTMan) Start() error { m.mu.Lock() if m.running { m.mu.Unlock() return fmt.Errorf("monitor already running") } m.running = true m.mu.Unlock() go m.monitorLoop() return nil } // Stop 停止监听 func (m *USDTMan) Stop() { m.mu.Lock() if !m.running { m.mu.Unlock() return } m.running = false m.mu.Unlock() m.cancel() } // monitorLoop 监听循环 func (m *USDTMan) monitorLoop() { ticker := time.NewTicker(m.queryInterval) defer ticker.Stop() for { select { case <-m.ctx.Done(): return case <-ticker.C: m.checkAllAddresses() } } } // checkAllAddresses 检查所有地址 func (m *USDTMan) checkAllAddresses() { m.mu.RLock() addresses := make([]string, len(m.addresses)) copy(addresses, m.addresses) m.mu.RUnlock() for _, address := range addresses { m.checkAddress(address) } } // checkAddress 检查单个地址的交易 func (m *USDTMan) checkAddress(address string) { transactions, err := m.getUSDTTransactions(address) if err != nil { fmt.Printf("Error checking address %s: %v\n", address, err) return } currentBlock, err := m.getCurrentBlock() if err != nil { fmt.Printf("Error getting current block: %v\n", err) return } for _, txn := range transactions { // 检查是否已处理 m.txnMutex.RLock() processed := m.processedTxns[txn.TransactionID] m.txnMutex.RUnlock() if processed { continue } // 获取交易的区块号(需要通过 transaction_id 查询) blockNumber, err := m.getTransactionBlock(txn.TransactionID) if err != nil { fmt.Printf("Error getting transaction block: %v\n", err) continue } // 计算确认数 confirmations := currentBlock - blockNumber if confirmations < m.minConfirmations { continue } // 检查是否是转入该地址 if !strings.EqualFold(txn.To, address) { continue } // 解析金额(使用 big.Int) amount := new(big.Int) amount, ok := amount.SetString(txn.Value, 10) if !ok || amount.Sign() <= 0 { continue } payment := &USDTPayment{ Address: address, Amount: amount, TxID: txn.TransactionID, BlockNumber: blockNumber, Timestamp: txn.BlockTimestamp, From: txn.From, Confirmations: confirmations, } // 标记为已处理 m.txnMutex.Lock() m.processedTxns[txn.TransactionID] = true m.txnMutex.Unlock() // 触发回调 m.mu.RLock() callback := m.onPaymentComplete m.mu.RUnlock() if callback != nil { callback(payment) } } } // getUSDTTransactions 获取地址的 USDT 交易 func (m *USDTMan) getUSDTTransactions(address string) ([]TronGridTransaction, error) { url := fmt.Sprintf("%s/v1/accounts/%s/transactions/trc20?limit=%d&contract_address=%s&only_to=true", TronGridAPI, address, m.maxHistoryTxns, USDTContractAddress) req, err := http.NewRequest("GET", url, nil) if err != nil { return nil, err } if m.apiKey != "" { req.Header.Set("TRON-PRO-API-KEY", m.apiKey) } resp, err := m.httpClient.Do(req) if err != nil { return nil, err } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return nil, err } if resp.StatusCode != 200 { return nil, fmt.Errorf("TronGrid API error: %d %s", resp.StatusCode, string(body)) } var result TronGridAccountTransactions if err := json.Unmarshal(body, &result); err != nil { return nil, err } return result.Data, nil } // getUSDTTransactionsWithLimit 获取指定数量的交易 func (m *USDTMan) getUSDTTransactionsWithLimit(address string, limit int) ([]TronGridTransaction, error) { url := fmt.Sprintf("%s/v1/accounts/%s/transactions/trc20?limit=%d&contract_address=%s&only_to=true", TronGridAPI, address, limit, USDTContractAddress) req, err := http.NewRequest("GET", url, nil) if err != nil { return nil, err } if m.apiKey != "" { req.Header.Set("TRON-PRO-API-KEY", m.apiKey) } resp, err := m.httpClient.Do(req) if err != nil { return nil, err } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return nil, err } if resp.StatusCode != 200 { return nil, fmt.Errorf("TronGrid API error: %d %s", resp.StatusCode, string(body)) } var result TronGridAccountTransactions if err := json.Unmarshal(body, &result); err != nil { return nil, err } return result.Data, nil } // getTransactionBlock 获取交易的区块号 func (m *USDTMan) getTransactionBlock(txID string) (int64, error) { url := fmt.Sprintf("%s/wallet/gettransactioninfobyid?value=%s", TronGridAPI, txID) req, err := http.NewRequest("GET", url, nil) if err != nil { return 0, err } if m.apiKey != "" { req.Header.Set("TRON-PRO-API-KEY", m.apiKey) } resp, err := m.httpClient.Do(req) if err != nil { return 0, err } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return 0, err } if resp.StatusCode != 200 { return 0, fmt.Errorf("TronGrid API error: %d", resp.StatusCode) } var result struct { BlockNumber int64 `json:"blockNumber"` } fmt.Println("body", string(body)) if err := json.Unmarshal(body, &result); err != nil { return 0, err } return result.BlockNumber, nil } // getCurrentBlock 获取当前区块高度 func (m *USDTMan) getCurrentBlock() (int64, error) { url := fmt.Sprintf("%s/wallet/getnowblock", TronGridAPI) req, err := http.NewRequest("GET", url, nil) if err != nil { return 0, err } if m.apiKey != "" { req.Header.Set("TRON-PRO-API-KEY", m.apiKey) } resp, err := m.httpClient.Do(req) if err != nil { return 0, err } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return 0, err } if resp.StatusCode != 200 { return 0, fmt.Errorf("TronGrid API error: %d", resp.StatusCode) } var blockInfo TronGridBlockInfo if err := json.Unmarshal(body, &blockInfo); err != nil { return 0, err } return blockInfo.BlockHeader.RawData.Number, nil } // parseTransaction 解析交易(已废弃,直接在 checkAddress 中处理) // GetAddresses 获取所有监听地址 func (m *USDTMan) GetAddresses() []string { m.mu.RLock() defer m.mu.RUnlock() addresses := make([]string, len(m.addresses)) copy(addresses, m.addresses) return addresses } // AddAddress 添加监听地址 func (m *USDTMan) AddAddress(address string) { m.mu.Lock() defer m.mu.Unlock() // 检查是否已存在 for _, addr := range m.addresses { if addr == address { return } } m.addresses = append(m.addresses, address) } // RemoveAddress 移除监听地址 func (m *USDTMan) RemoveAddress(address string) { m.mu.Lock() defer m.mu.Unlock() for i, addr := range m.addresses { if addr == address { m.addresses = append(m.addresses[:i], m.addresses[i+1:]...) return } } } // QueryTransactions 主动查询交易记录(带过滤条件) func (m *USDTMan) QueryTransactions(filter QueryFilter) ([]*USDTPayment, error) { if filter.Address == "" { return nil, fmt.Errorf("address is required") } // 设置默认 limit limit := filter.Limit if limit == 0 { limit = m.maxHistoryTxns } // 获取交易列表 transactions, err := m.getUSDTTransactionsWithLimit(filter.Address, limit) if err != nil { return nil, err } // 获取当前区块(用于计算确认数) currentBlock, err := m.getCurrentBlock() if err != nil { return nil, err } var results []*USDTPayment for _, txn := range transactions { // 时间过滤 if filter.StartTime > 0 && txn.BlockTimestamp < filter.StartTime { continue } if filter.EndTime > 0 && txn.BlockTimestamp > filter.EndTime { continue } // 只处理转入该地址的交易 if !strings.EqualFold(txn.To, filter.Address) { continue } // 解析金额 amount := new(big.Int) amount, ok := amount.SetString(txn.Value, 10) if !ok || amount.Sign() <= 0 { continue } // 金额过滤 if filter.MinAmount != nil && amount.Cmp(filter.MinAmount) < 0 { continue } // 获取区块号 blockNumber, err := m.getTransactionBlock(txn.TransactionID) if err != nil { continue } // 计算确认数 confirmations := currentBlock - blockNumber // 确认数过滤 if filter.MinConfirmations > 0 && confirmations < filter.MinConfirmations { continue } payment := &USDTPayment{ Address: filter.Address, Amount: amount, TxID: txn.TransactionID, BlockNumber: blockNumber, Timestamp: txn.BlockTimestamp, From: txn.From, Confirmations: confirmations, } results = append(results, payment) } return results, nil }