// Package coinman:USDT(TRC20) 多地址收款、轮询回调,支持分布式部署 package coinman import ( "context" "encoding/json" "fmt" "io" "net/http" "sync" "time" "github.com/redis/go-redis/v9" ) const ( tronGridBase = "https://api.trongrid.io" usdtContract = "TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t" redisKeyAddrs = "coinman:addresses" redisKeyLastTs = "coinman:last_ts:%s" redisKeyNotified = "coinman:notified:%s" // 分布式去重:仅第一个实例回调 redisKeySeen = "coinman:seen:%s" // 已处理 tx_id 集合,避免重复 pollDefault = 10 * time.Second ) // OrderInfo 单笔到账订单信息 type OrderInfo struct { From string `json:"from"` To string `json:"to"` Value string `json:"value"` TxID string `json:"tx_id"` BlockTs int64 `json:"block_ts"` } // Coinman 收款管理(多地址、分布式) type Coinman struct { redis *redis.Client ctx context.Context onComplete func(address string, order OrderInfo) notifyStream string // 可选:到账时 XADD 到此 Stream,供多消费者 pollInterval time.Duration mu sync.Mutex started bool stopCh chan struct{} memAddrs map[string]struct{} // redis 为 nil 时使用 } // New 创建 Coinman,redis 为 nil 时仅内存模式(单机) func New(redisClient *redis.Client) *Coinman { c := &Coinman{ redis: redisClient, ctx: context.Background(), pollInterval: pollDefault, stopCh: make(chan struct{}), memAddrs: make(map[string]struct{}), } return c } // Start 开启轮询与回调 func (c *Coinman) Start() { c.mu.Lock() if c.started { c.mu.Unlock() return } c.started = true c.mu.Unlock() go c.pollLoop() } // OnOrderComplete 设置到账回调:每笔新到账调用一次 (收款地址, 订单信息) func (c *Coinman) OnOrderComplete(fn func(address string, order OrderInfo)) { c.mu.Lock() c.onComplete = fn c.mu.Unlock() } // SetNotifyStream 设置 Redis Stream:到账时 XADD,多消费者可独立消费,无需额外 MQ func (c *Coinman) SetNotifyStream(stream string) { c.mu.Lock() c.notifyStream = stream c.mu.Unlock() } // AddPaymentAddress 动态添加收款地址(批量) func (c *Coinman) AddPaymentAddress(addrs []string) { if len(addrs) == 0 { return } if c.redis != nil { for _, a := range addrs { if a != "" { c.redis.SAdd(c.ctx, redisKeyAddrs, a) } } return } c.mu.Lock() for _, a := range addrs { if a != "" { c.memAddrs[a] = struct{}{} } } c.mu.Unlock() } // RemovePaymentAddress 动态删除收款地址(批量) func (c *Coinman) RemovePaymentAddress(addrs []string) { if len(addrs) == 0 { return } if c.redis != nil { c.redis.SRem(c.ctx, redisKeyAddrs, addrs) return } c.mu.Lock() for _, a := range addrs { delete(c.memAddrs, a) } c.mu.Unlock() } // ListPaymentAddresses 返回当前监听的收款地址列表 func (c *Coinman) ListPaymentAddresses() []string { return c.getAddresses() } func (c *Coinman) getAddresses() []string { if c.redis != nil { list, _ := c.redis.SMembers(c.ctx, redisKeyAddrs).Result() return list } c.mu.Lock() defer c.mu.Unlock() out := make([]string, 0, len(c.memAddrs)) for a := range c.memAddrs { out = append(out, a) } return out } func (c *Coinman) getLastTs(addr string) int64 { if c.redis != nil { s, _ := c.redis.Get(c.ctx, fmt.Sprintf(redisKeyLastTs, addr)).Int64() return s } return 0 // 内存模式在 poll 里用本地 map 存 lastTs,见 pollLoop } func (c *Coinman) setLastTs(addr string, ts int64) { if c.redis != nil { c.redis.Set(c.ctx, fmt.Sprintf(redisKeyLastTs, addr), ts, 0) } } // 分布式去重:仅一个实例能“认领”该 tx 并回调 func (c *Coinman) claimNotify(txID string) bool { if c.redis != nil { key := fmt.Sprintf(redisKeyNotified, txID) ok, _ := c.redis.SetNX(c.ctx, key, "1", 7*24*time.Hour).Result() return ok } return true } func (c *Coinman) seen(addr, txID string) bool { if c.redis != nil { key := fmt.Sprintf(redisKeySeen, addr) n, _ := c.redis.SAdd(c.ctx, key, txID).Result() return n == 0 // 已存在 } return false } func (c *Coinman) markSeen(addr, txID string) { if c.redis != nil { key := fmt.Sprintf(redisKeySeen, addr) c.redis.SAdd(c.ctx, key, txID) c.redis.Expire(c.ctx, key, 7*24*time.Hour) } } // 内存模式下的 lastTs / seen 存在 Coinman 里 type addrState struct { lastTs int64 seen map[string]bool } var memState = struct { sync.RWMutex m map[string]*addrState }{m: make(map[string]*addrState)} func (c *Coinman) getLastTsMem(addr string) int64 { memState.RLock() defer memState.RUnlock() if s, ok := memState.m[addr]; ok { return s.lastTs } return 0 } func (c *Coinman) setLastTsMem(addr string, ts int64) { memState.Lock() defer memState.Unlock() if memState.m[addr] == nil { memState.m[addr] = &addrState{seen: make(map[string]bool)} } memState.m[addr].lastTs = ts } func (c *Coinman) seenMem(addr, txID string) bool { memState.Lock() defer memState.Unlock() if memState.m[addr] == nil { memState.m[addr] = &addrState{seen: make(map[string]bool)} } _, ok := memState.m[addr].seen[txID] return ok } func (c *Coinman) markSeenMem(addr, txID string) { memState.Lock() defer memState.Unlock() if memState.m[addr] == nil { memState.m[addr] = &addrState{seen: make(map[string]bool)} } memState.m[addr].seen[txID] = true } func (c *Coinman) publishOrder(stream, addr string, order OrderInfo) { payload, _ := json.Marshal(map[string]interface{}{"address": addr, "order": order}) c.redis.XAdd(c.ctx, &redis.XAddArgs{ Stream: stream, Values: map[string]interface{}{"address": addr, "data": string(payload)}, }) } func (c *Coinman) pollLoop() { ticker := time.NewTicker(c.pollInterval) defer ticker.Stop() for { select { case <-c.stopCh: return case <-ticker.C: c.pollOnce() } } } func (c *Coinman) pollOnce() { addrs := c.getAddresses() if len(addrs) == 0 { return } c.mu.Lock() onComplete := c.onComplete notifyStream := c.notifyStream c.mu.Unlock() if onComplete == nil && notifyStream == "" { return } for _, addr := range addrs { var lastTs int64 if c.redis != nil { lastTs = c.getLastTs(addr) } else { lastTs = c.getLastTsMem(addr) } transfers, err := fetchIncoming(addr, lastTs, 50) if err != nil { continue } var maxTs int64 for _, t := range transfers { if !t.Confirmed { continue } seen := c.redis != nil && c.seen(addr, t.TxID) if !seen && c.redis == nil { seen = c.seenMem(addr, t.TxID) } if seen { if t.BlockTs > maxTs { maxTs = t.BlockTs } continue } if c.redis != nil { c.markSeen(addr, t.TxID) } else { c.markSeenMem(addr, t.TxID) } if t.BlockTs > maxTs { maxTs = t.BlockTs } if !c.claimNotify(t.TxID) { continue } order := OrderInfo{From: t.From, To: t.To, Value: t.Value, TxID: t.TxID, BlockTs: t.BlockTs} if notifyStream != "" && c.redis != nil { c.publishOrder(notifyStream, addr, order) } if onComplete != nil { onComplete(addr, order) } } if maxTs > 0 { if c.redis != nil { c.setLastTs(addr, maxTs) } else { c.setLastTsMem(addr, maxTs) } } } } type transfer struct { From string To string Value string TxID string BlockTs int64 Confirmed bool } // FetchIncoming 拉取某地址 TRC20-USDT 转入记录(供 HTTP 查询接口) func FetchIncoming(address string, sinceTs int64, limit int) ([]OrderInfo, error) { list, err := fetchIncoming(address, sinceTs, limit) if err != nil { return nil, err } out := make([]OrderInfo, 0, len(list)) for _, t := range list { if t.Confirmed { out = append(out, OrderInfo{From: t.From, To: t.To, Value: t.Value, TxID: t.TxID, BlockTs: t.BlockTs}) } } return out, nil } // GetTxStatus 查询单笔交易是否成功 func GetTxStatus(txID string) (confirmed bool, blockTs int64, err error) { url := fmt.Sprintf("%s/wallet/gettransactionbyid?value=%s", tronGridBase, txID) req, _ := http.NewRequest("GET", url, nil) resp, err := http.DefaultClient.Do(req) if err != nil { return false, 0, err } defer resp.Body.Close() body, _ := io.ReadAll(resp.Body) var raw struct { Ret []struct{ ContractRet string } `json:"ret"` BlockTimestamp int64 `json:"block_timestamp"` } if err := json.Unmarshal(body, &raw); err != nil { return false, 0, err } if len(raw.Ret) > 0 { confirmed = raw.Ret[0].ContractRet == "SUCCESS" } return confirmed, raw.BlockTimestamp, nil } func fetchIncoming(address string, sinceTs int64, limit int) ([]transfer, error) { url := fmt.Sprintf("%s/v1/accounts/%s/transactions/trc20?only_to=true&contract_address=%s&limit=%d", tronGridBase, address, usdtContract, limit) if sinceTs > 0 { url += "&min_timestamp=" + fmt.Sprintf("%d", sinceTs) } req, _ := http.NewRequest("GET", url, nil) req.Header.Set("Accept", "application/json") resp, err := http.DefaultClient.Do(req) if err != nil { return nil, err } defer resp.Body.Close() body, _ := io.ReadAll(resp.Body) var raw struct { Data []struct { TxID string `json:"transaction_id"` BlockTs int64 `json:"block_timestamp"` From string `json:"from"` To string `json:"to"` Value string `json:"value"` ContractRet string `json:"contract_ret"` } `json:"data"` Success bool `json:"success"` } if err := json.Unmarshal(body, &raw); err != nil || !raw.Success { return nil, fmt.Errorf("trongrid api err") } list := make([]transfer, 0, len(raw.Data)) for _, d := range raw.Data { val := d.Value if len(val) > 6 { val = val[:len(val)-6] + "." + val[len(val)-6:] } else { val = "0." + val } list = append(list, transfer{ From: d.From, To: d.To, Value: val, TxID: d.TxID, BlockTs: d.BlockTs, Confirmed: d.ContractRet == "SUCCESS", }) } return list, nil }