Files
usdtman/tron_usdt_monitor.go
2026-02-03 15:05:06 +08:00

598 lines
14 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package usdtman
import (
"context"
"encoding/json"
"fmt"
"io"
"math/big"
"net/http"
"net/url"
"strings"
"sync"
"time"
)
// Config USDTMan 配置
type Config struct {
Addresses []string // 监听地址列表
APIKey string // TronGrid API Key
QueryInterval time.Duration // 查询间隔(秒),默认 5 秒
MinConfirmations int64 // 最小确认数,默认 6
MaxHistoryTxns int // 每次查询的最大历史交易数,默认 20
ProxyURL string // HTTP/SOCKS5 代理地址,例如 "http://127.0.0.1:7890" 或 "socks5://127.0.0.1:1080"
Transport http.RoundTripper // 自定义 Transport优先级高于 ProxyURL
}
// USDTMan USDT 监听管理器
type USDTMan struct {
mu sync.RWMutex
addresses []string
apiKey string
queryInterval time.Duration
running bool
ctx context.Context
cancel context.CancelFunc
onPaymentComplete func(*USDTPayment)
minConfirmations int64
maxHistoryTxns int
processedTxns map[string]bool // 已处理的交易
txnMutex sync.RWMutex
httpClient *http.Client // HTTP 客户端
}
// QueryFilter 查询过滤条件
type QueryFilter struct {
Address string // 地址(必填)
StartTime int64 // 开始时间戳毫秒0 表示不限制
EndTime int64 // 结束时间戳毫秒0 表示不限制
MinAmount *big.Int // 最小金额micro USDTnil 表示不限制
MinConfirmations int64 // 最小确认数0 表示不限制
Limit int // 查询数量限制0 使用默认值
}
// USDTPayment USDT 收款信息
type USDTPayment struct {
Address string
Amount *big.Int // 原始金额micro USDT10^-6例如 13260000 = 13.26 USDT
TxID string
BlockNumber int64
Timestamp int64
From string
Confirmations int64
}
// GetAmountFloat 获取浮点数金额USDT
func (p *USDTPayment) GetAmountFloat() float64 {
divisor := new(big.Float).SetInt64(1000000)
amount := new(big.Float).SetInt(p.Amount)
result := new(big.Float).Quo(amount, divisor)
f, _ := result.Float64()
return f
}
// GetAmountString 获取字符串金额USDT保留6位小数
func (p *USDTPayment) GetAmountString() string {
return fmt.Sprintf("%.6f", p.GetAmountFloat())
}
// MarshalJSON 自定义 JSON 序列化
func (p *USDTPayment) MarshalJSON() ([]byte, error) {
return json.Marshal(struct {
Address string `json:"Address"`
Amount float64 `json:"Amount"`
AmountRaw string `json:"AmountRaw"`
TxID string `json:"TxID"`
BlockNumber int64 `json:"BlockNumber"`
Timestamp int64 `json:"Timestamp"`
From string `json:"From"`
Confirmations int64 `json:"Confirmations"`
}{
Address: p.Address,
Amount: p.GetAmountFloat(),
AmountRaw: p.Amount.String(),
TxID: p.TxID,
BlockNumber: p.BlockNumber,
Timestamp: p.Timestamp,
From: p.From,
Confirmations: p.Confirmations,
})
}
// TronGridTransaction TRC20 交易信息
type TronGridTransaction struct {
TransactionID string `json:"transaction_id"`
BlockTimestamp int64 `json:"block_timestamp"`
From string `json:"from"`
To string `json:"to"`
Type string `json:"type"`
Value string `json:"value"`
TokenInfo struct {
Symbol string `json:"symbol"`
Address string `json:"address"`
Decimals int `json:"decimals"`
Name string `json:"name"`
} `json:"token_info"`
}
// TronGridAccountTransactions 账户交易列表
type TronGridAccountTransactions struct {
Success bool `json:"success"`
Data []TronGridTransaction `json:"data"`
Meta struct {
At int64 `json:"at"`
PageSize int `json:"page_size"`
} `json:"meta"`
}
// TronGridBlockInfo 区块信息
type TronGridBlockInfo struct {
BlockHeader struct {
RawData struct {
Number int64 `json:"number"`
} `json:"raw_data"`
} `json:"block_header"`
}
const (
TronGridAPI = "https://api.trongrid.io"
USDTContractAddress = "TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t" // USDT TRC20
)
// NewUSDTMan 创建监听管理器
func NewUSDTMan(config Config) *USDTMan {
ctx, cancel := context.WithCancel(context.Background())
// 设置默认值
if config.QueryInterval == 0 {
config.QueryInterval = 5 * time.Second
}
if config.MinConfirmations == 0 {
config.MinConfirmations = 6
}
if config.MaxHistoryTxns == 0 {
config.MaxHistoryTxns = 20
}
// 创建 HTTP 客户端
httpClient := &http.Client{
Timeout: 10 * time.Second,
}
// 配置代理
if config.Transport != nil {
// 使用自定义 Transport
httpClient.Transport = config.Transport
} else if config.ProxyURL != "" {
// 使用代理 URL
proxyURL, err := url.Parse(config.ProxyURL)
if err == nil {
httpClient.Transport = &http.Transport{
Proxy: http.ProxyURL(proxyURL),
}
} else {
fmt.Printf("⚠️ 代理 URL 解析失败: %v使用直连\n", err)
}
}
return &USDTMan{
addresses: config.Addresses,
apiKey: config.APIKey,
queryInterval: config.QueryInterval,
minConfirmations: config.MinConfirmations,
maxHistoryTxns: config.MaxHistoryTxns,
httpClient: httpClient,
ctx: ctx,
cancel: cancel,
processedTxns: make(map[string]bool),
}
}
// OnPaymentComplete 设置收款回调
func (m *USDTMan) OnPaymentComplete(callback func(*USDTPayment)) {
m.mu.Lock()
defer m.mu.Unlock()
m.onPaymentComplete = callback
}
// Start 开始监听
func (m *USDTMan) Start() error {
m.mu.Lock()
if m.running {
m.mu.Unlock()
return fmt.Errorf("monitor already running")
}
m.running = true
m.mu.Unlock()
go m.monitorLoop()
return nil
}
// Stop 停止监听
func (m *USDTMan) Stop() {
m.mu.Lock()
if !m.running {
m.mu.Unlock()
return
}
m.running = false
m.mu.Unlock()
m.cancel()
}
// monitorLoop 监听循环
func (m *USDTMan) monitorLoop() {
ticker := time.NewTicker(m.queryInterval)
defer ticker.Stop()
for {
select {
case <-m.ctx.Done():
return
case <-ticker.C:
m.checkAllAddresses()
}
}
}
// checkAllAddresses 检查所有地址
func (m *USDTMan) checkAllAddresses() {
m.mu.RLock()
addresses := make([]string, len(m.addresses))
copy(addresses, m.addresses)
m.mu.RUnlock()
for _, address := range addresses {
m.checkAddress(address)
}
}
// checkAddress 检查单个地址的交易
func (m *USDTMan) checkAddress(address string) {
transactions, err := m.getUSDTTransactions(address)
if err != nil {
fmt.Printf("Error checking address %s: %v\n", address, err)
return
}
currentBlock, err := m.getCurrentBlock()
if err != nil {
fmt.Printf("Error getting current block: %v\n", err)
return
}
for _, txn := range transactions {
// 检查是否已处理
m.txnMutex.RLock()
processed := m.processedTxns[txn.TransactionID]
m.txnMutex.RUnlock()
if processed {
continue
}
// 获取交易的区块号(需要通过 transaction_id 查询)
blockNumber, err := m.getTransactionBlock(txn.TransactionID)
if err != nil {
fmt.Printf("Error getting transaction block: %v\n", err)
continue
}
// 计算确认数
confirmations := currentBlock - blockNumber
if confirmations < m.minConfirmations {
continue
}
// 检查是否是转入该地址
if !strings.EqualFold(txn.To, address) {
continue
}
// 解析金额(使用 big.Int
amount := new(big.Int)
amount, ok := amount.SetString(txn.Value, 10)
if !ok || amount.Sign() <= 0 {
continue
}
payment := &USDTPayment{
Address: address,
Amount: amount,
TxID: txn.TransactionID,
BlockNumber: blockNumber,
Timestamp: txn.BlockTimestamp,
From: txn.From,
Confirmations: confirmations,
}
// 标记为已处理
m.txnMutex.Lock()
m.processedTxns[txn.TransactionID] = true
m.txnMutex.Unlock()
// 触发回调
m.mu.RLock()
callback := m.onPaymentComplete
m.mu.RUnlock()
if callback != nil {
callback(payment)
}
}
}
// getUSDTTransactions 获取地址的 USDT 交易
func (m *USDTMan) getUSDTTransactions(address string) ([]TronGridTransaction, error) {
url := fmt.Sprintf("%s/v1/accounts/%s/transactions/trc20?limit=%d&contract_address=%s&only_to=true",
TronGridAPI, address, m.maxHistoryTxns, USDTContractAddress)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
if m.apiKey != "" {
req.Header.Set("TRON-PRO-API-KEY", m.apiKey)
}
resp, err := m.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf("TronGrid API error: %d %s", resp.StatusCode, string(body))
}
var result TronGridAccountTransactions
if err := json.Unmarshal(body, &result); err != nil {
return nil, err
}
return result.Data, nil
}
// getUSDTTransactionsWithLimit 获取指定数量的交易
func (m *USDTMan) getUSDTTransactionsWithLimit(address string, limit int) ([]TronGridTransaction, error) {
url := fmt.Sprintf("%s/v1/accounts/%s/transactions/trc20?limit=%d&contract_address=%s&only_to=true",
TronGridAPI, address, limit, USDTContractAddress)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
if m.apiKey != "" {
req.Header.Set("TRON-PRO-API-KEY", m.apiKey)
}
resp, err := m.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf("TronGrid API error: %d %s", resp.StatusCode, string(body))
}
var result TronGridAccountTransactions
if err := json.Unmarshal(body, &result); err != nil {
return nil, err
}
return result.Data, nil
}
// getTransactionBlock 获取交易的区块号
func (m *USDTMan) getTransactionBlock(txID string) (int64, error) {
url := fmt.Sprintf("%s/wallet/gettransactioninfobyid?value=%s", TronGridAPI, txID)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return 0, err
}
if m.apiKey != "" {
req.Header.Set("TRON-PRO-API-KEY", m.apiKey)
}
resp, err := m.httpClient.Do(req)
if err != nil {
return 0, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return 0, err
}
if resp.StatusCode != 200 {
return 0, fmt.Errorf("TronGrid API error: %d", resp.StatusCode)
}
var result struct {
BlockNumber int64 `json:"blockNumber"`
}
fmt.Println("body", string(body))
if err := json.Unmarshal(body, &result); err != nil {
return 0, err
}
return result.BlockNumber, nil
}
// getCurrentBlock 获取当前区块高度
func (m *USDTMan) getCurrentBlock() (int64, error) {
url := fmt.Sprintf("%s/wallet/getnowblock", TronGridAPI)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return 0, err
}
if m.apiKey != "" {
req.Header.Set("TRON-PRO-API-KEY", m.apiKey)
}
resp, err := m.httpClient.Do(req)
if err != nil {
return 0, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return 0, err
}
if resp.StatusCode != 200 {
return 0, fmt.Errorf("TronGrid API error: %d", resp.StatusCode)
}
var blockInfo TronGridBlockInfo
if err := json.Unmarshal(body, &blockInfo); err != nil {
return 0, err
}
return blockInfo.BlockHeader.RawData.Number, nil
}
// parseTransaction 解析交易(已废弃,直接在 checkAddress 中处理)
// GetAddresses 获取所有监听地址
func (m *USDTMan) GetAddresses() []string {
m.mu.RLock()
defer m.mu.RUnlock()
addresses := make([]string, len(m.addresses))
copy(addresses, m.addresses)
return addresses
}
// AddAddress 添加监听地址
func (m *USDTMan) AddAddress(address string) {
m.mu.Lock()
defer m.mu.Unlock()
// 检查是否已存在
for _, addr := range m.addresses {
if addr == address {
return
}
}
m.addresses = append(m.addresses, address)
}
// RemoveAddress 移除监听地址
func (m *USDTMan) RemoveAddress(address string) {
m.mu.Lock()
defer m.mu.Unlock()
for i, addr := range m.addresses {
if addr == address {
m.addresses = append(m.addresses[:i], m.addresses[i+1:]...)
return
}
}
}
// QueryTransactions 主动查询交易记录(带过滤条件)
func (m *USDTMan) QueryTransactions(filter QueryFilter) ([]*USDTPayment, error) {
if filter.Address == "" {
return nil, fmt.Errorf("address is required")
}
// 设置默认 limit
limit := filter.Limit
if limit == 0 {
limit = m.maxHistoryTxns
}
// 获取交易列表
transactions, err := m.getUSDTTransactionsWithLimit(filter.Address, limit)
if err != nil {
return nil, err
}
// 获取当前区块(用于计算确认数)
currentBlock, err := m.getCurrentBlock()
if err != nil {
return nil, err
}
var results []*USDTPayment
for _, txn := range transactions {
// 时间过滤
if filter.StartTime > 0 && txn.BlockTimestamp < filter.StartTime {
continue
}
if filter.EndTime > 0 && txn.BlockTimestamp > filter.EndTime {
continue
}
// 只处理转入该地址的交易
if !strings.EqualFold(txn.To, filter.Address) {
continue
}
// 解析金额
amount := new(big.Int)
amount, ok := amount.SetString(txn.Value, 10)
if !ok || amount.Sign() <= 0 {
continue
}
// 金额过滤
if filter.MinAmount != nil && amount.Cmp(filter.MinAmount) < 0 {
continue
}
// 获取区块号
blockNumber, err := m.getTransactionBlock(txn.TransactionID)
if err != nil {
continue
}
// 计算确认数
confirmations := currentBlock - blockNumber
// 确认数过滤
if filter.MinConfirmations > 0 && confirmations < filter.MinConfirmations {
continue
}
payment := &USDTPayment{
Address: filter.Address,
Amount: amount,
TxID: txn.TransactionID,
BlockNumber: blockNumber,
Timestamp: txn.BlockTimestamp,
From: txn.From,
Confirmations: confirmations,
}
results = append(results, payment)
}
return results, nil
}