Files
rnpay/servers/coinman/coinman.go
2026-02-02 02:23:21 +08:00

413 lines
9.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package coinmanUSDT(TRC20) 多地址收款、轮询回调,支持分布式部署
package coinman
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"sync"
"time"
"github.com/redis/go-redis/v9"
)
const (
tronGridBase = "https://api.trongrid.io"
usdtContract = "TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t"
redisKeyAddrs = "coinman:addresses"
redisKeyLastTs = "coinman:last_ts:%s"
redisKeyNotified = "coinman:notified:%s" // 分布式去重:仅第一个实例回调
redisKeySeen = "coinman:seen:%s" // 已处理 tx_id 集合,避免重复
pollDefault = 10 * time.Second
)
// OrderInfo 单笔到账订单信息
type OrderInfo struct {
From string `json:"from"`
To string `json:"to"`
Value string `json:"value"`
TxID string `json:"tx_id"`
BlockTs int64 `json:"block_ts"`
}
// Coinman 收款管理(多地址、分布式)
type Coinman struct {
redis *redis.Client
ctx context.Context
onComplete func(address string, order OrderInfo)
notifyStream string // 可选:到账时 XADD 到此 Stream供多消费者
pollInterval time.Duration
mu sync.Mutex
started bool
stopCh chan struct{}
memAddrs map[string]struct{} // redis 为 nil 时使用
}
// New 创建 Coinmanredis 为 nil 时仅内存模式(单机)
func New(redisClient *redis.Client) *Coinman {
c := &Coinman{
redis: redisClient,
ctx: context.Background(),
pollInterval: pollDefault,
stopCh: make(chan struct{}),
memAddrs: make(map[string]struct{}),
}
return c
}
// Start 开启轮询与回调
func (c *Coinman) Start() {
c.mu.Lock()
if c.started {
c.mu.Unlock()
return
}
c.started = true
c.mu.Unlock()
go c.pollLoop()
}
// OnOrderComplete 设置到账回调:每笔新到账调用一次 (收款地址, 订单信息)
func (c *Coinman) OnOrderComplete(fn func(address string, order OrderInfo)) {
c.mu.Lock()
c.onComplete = fn
c.mu.Unlock()
}
// SetNotifyStream 设置 Redis Stream到账时 XADD多消费者可独立消费无需额外 MQ
func (c *Coinman) SetNotifyStream(stream string) {
c.mu.Lock()
c.notifyStream = stream
c.mu.Unlock()
}
// AddPaymentAddress 动态添加收款地址(批量)
func (c *Coinman) AddPaymentAddress(addrs []string) {
if len(addrs) == 0 {
return
}
if c.redis != nil {
for _, a := range addrs {
if a != "" {
c.redis.SAdd(c.ctx, redisKeyAddrs, a)
}
}
return
}
c.mu.Lock()
for _, a := range addrs {
if a != "" {
c.memAddrs[a] = struct{}{}
}
}
c.mu.Unlock()
}
// RemovePaymentAddress 动态删除收款地址(批量)
func (c *Coinman) RemovePaymentAddress(addrs []string) {
if len(addrs) == 0 {
return
}
if c.redis != nil {
c.redis.SRem(c.ctx, redisKeyAddrs, addrs)
return
}
c.mu.Lock()
for _, a := range addrs {
delete(c.memAddrs, a)
}
c.mu.Unlock()
}
// ListPaymentAddresses 返回当前监听的收款地址列表
func (c *Coinman) ListPaymentAddresses() []string {
return c.getAddresses()
}
func (c *Coinman) getAddresses() []string {
if c.redis != nil {
list, _ := c.redis.SMembers(c.ctx, redisKeyAddrs).Result()
return list
}
c.mu.Lock()
defer c.mu.Unlock()
out := make([]string, 0, len(c.memAddrs))
for a := range c.memAddrs {
out = append(out, a)
}
return out
}
func (c *Coinman) getLastTs(addr string) int64 {
if c.redis != nil {
s, _ := c.redis.Get(c.ctx, fmt.Sprintf(redisKeyLastTs, addr)).Int64()
return s
}
return 0 // 内存模式在 poll 里用本地 map 存 lastTs见 pollLoop
}
func (c *Coinman) setLastTs(addr string, ts int64) {
if c.redis != nil {
c.redis.Set(c.ctx, fmt.Sprintf(redisKeyLastTs, addr), ts, 0)
}
}
// 分布式去重:仅一个实例能“认领”该 tx 并回调
func (c *Coinman) claimNotify(txID string) bool {
if c.redis != nil {
key := fmt.Sprintf(redisKeyNotified, txID)
ok, _ := c.redis.SetNX(c.ctx, key, "1", 7*24*time.Hour).Result()
return ok
}
return true
}
func (c *Coinman) seen(addr, txID string) bool {
if c.redis != nil {
key := fmt.Sprintf(redisKeySeen, addr)
n, _ := c.redis.SAdd(c.ctx, key, txID).Result()
return n == 0 // 已存在
}
return false
}
func (c *Coinman) markSeen(addr, txID string) {
if c.redis != nil {
key := fmt.Sprintf(redisKeySeen, addr)
c.redis.SAdd(c.ctx, key, txID)
c.redis.Expire(c.ctx, key, 7*24*time.Hour)
}
}
// 内存模式下的 lastTs / seen 存在 Coinman 里
type addrState struct {
lastTs int64
seen map[string]bool
}
var memState = struct {
sync.RWMutex
m map[string]*addrState
}{m: make(map[string]*addrState)}
func (c *Coinman) getLastTsMem(addr string) int64 {
memState.RLock()
defer memState.RUnlock()
if s, ok := memState.m[addr]; ok {
return s.lastTs
}
return 0
}
func (c *Coinman) setLastTsMem(addr string, ts int64) {
memState.Lock()
defer memState.Unlock()
if memState.m[addr] == nil {
memState.m[addr] = &addrState{seen: make(map[string]bool)}
}
memState.m[addr].lastTs = ts
}
func (c *Coinman) seenMem(addr, txID string) bool {
memState.Lock()
defer memState.Unlock()
if memState.m[addr] == nil {
memState.m[addr] = &addrState{seen: make(map[string]bool)}
}
_, ok := memState.m[addr].seen[txID]
return ok
}
func (c *Coinman) markSeenMem(addr, txID string) {
memState.Lock()
defer memState.Unlock()
if memState.m[addr] == nil {
memState.m[addr] = &addrState{seen: make(map[string]bool)}
}
memState.m[addr].seen[txID] = true
}
func (c *Coinman) publishOrder(stream, addr string, order OrderInfo) {
payload, _ := json.Marshal(map[string]interface{}{"address": addr, "order": order})
c.redis.XAdd(c.ctx, &redis.XAddArgs{
Stream: stream,
Values: map[string]interface{}{"address": addr, "data": string(payload)},
})
}
func (c *Coinman) pollLoop() {
ticker := time.NewTicker(c.pollInterval)
defer ticker.Stop()
for {
select {
case <-c.stopCh:
return
case <-ticker.C:
c.pollOnce()
}
}
}
func (c *Coinman) pollOnce() {
addrs := c.getAddresses()
if len(addrs) == 0 {
return
}
c.mu.Lock()
onComplete := c.onComplete
notifyStream := c.notifyStream
c.mu.Unlock()
if onComplete == nil && notifyStream == "" {
return
}
for _, addr := range addrs {
var lastTs int64
if c.redis != nil {
lastTs = c.getLastTs(addr)
} else {
lastTs = c.getLastTsMem(addr)
}
transfers, err := fetchIncoming(addr, lastTs, 50)
if err != nil {
continue
}
var maxTs int64
for _, t := range transfers {
if !t.Confirmed {
continue
}
seen := c.redis != nil && c.seen(addr, t.TxID)
if !seen && c.redis == nil {
seen = c.seenMem(addr, t.TxID)
}
if seen {
if t.BlockTs > maxTs {
maxTs = t.BlockTs
}
continue
}
if c.redis != nil {
c.markSeen(addr, t.TxID)
} else {
c.markSeenMem(addr, t.TxID)
}
if t.BlockTs > maxTs {
maxTs = t.BlockTs
}
if !c.claimNotify(t.TxID) {
continue
}
order := OrderInfo{From: t.From, To: t.To, Value: t.Value, TxID: t.TxID, BlockTs: t.BlockTs}
if notifyStream != "" && c.redis != nil {
c.publishOrder(notifyStream, addr, order)
}
if onComplete != nil {
onComplete(addr, order)
}
}
if maxTs > 0 {
if c.redis != nil {
c.setLastTs(addr, maxTs)
} else {
c.setLastTsMem(addr, maxTs)
}
}
}
}
type transfer struct {
From string
To string
Value string
TxID string
BlockTs int64
Confirmed bool
}
// FetchIncoming 拉取某地址 TRC20-USDT 转入记录(供 HTTP 查询接口)
func FetchIncoming(address string, sinceTs int64, limit int) ([]OrderInfo, error) {
list, err := fetchIncoming(address, sinceTs, limit)
if err != nil {
return nil, err
}
out := make([]OrderInfo, 0, len(list))
for _, t := range list {
if t.Confirmed {
out = append(out, OrderInfo{From: t.From, To: t.To, Value: t.Value, TxID: t.TxID, BlockTs: t.BlockTs})
}
}
return out, nil
}
// GetTxStatus 查询单笔交易是否成功
func GetTxStatus(txID string) (confirmed bool, blockTs int64, err error) {
url := fmt.Sprintf("%s/wallet/gettransactionbyid?value=%s", tronGridBase, txID)
req, _ := http.NewRequest("GET", url, nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return false, 0, err
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
var raw struct {
Ret []struct{ ContractRet string } `json:"ret"`
BlockTimestamp int64 `json:"block_timestamp"`
}
if err := json.Unmarshal(body, &raw); err != nil {
return false, 0, err
}
if len(raw.Ret) > 0 {
confirmed = raw.Ret[0].ContractRet == "SUCCESS"
}
return confirmed, raw.BlockTimestamp, nil
}
func fetchIncoming(address string, sinceTs int64, limit int) ([]transfer, error) {
url := fmt.Sprintf("%s/v1/accounts/%s/transactions/trc20?only_to=true&contract_address=%s&limit=%d",
tronGridBase, address, usdtContract, limit)
if sinceTs > 0 {
url += "&min_timestamp=" + fmt.Sprintf("%d", sinceTs)
}
req, _ := http.NewRequest("GET", url, nil)
req.Header.Set("Accept", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
var raw struct {
Data []struct {
TxID string `json:"transaction_id"`
BlockTs int64 `json:"block_timestamp"`
From string `json:"from"`
To string `json:"to"`
Value string `json:"value"`
ContractRet string `json:"contract_ret"`
} `json:"data"`
Success bool `json:"success"`
}
if err := json.Unmarshal(body, &raw); err != nil || !raw.Success {
return nil, fmt.Errorf("trongrid api err")
}
list := make([]transfer, 0, len(raw.Data))
for _, d := range raw.Data {
val := d.Value
if len(val) > 6 {
val = val[:len(val)-6] + "." + val[len(val)-6:]
} else {
val = "0." + val
}
list = append(list, transfer{
From: d.From,
To: d.To,
Value: val,
TxID: d.TxID,
BlockTs: d.BlockTs,
Confirmed: d.ContractRet == "SUCCESS",
})
}
return list, nil
}