diff --git a/servers/coinman/cmd/coinman/admin.html b/servers/coinman/cmd/coinman/admin.html deleted file mode 100644 index 278372b..0000000 --- a/servers/coinman/cmd/coinman/admin.html +++ /dev/null @@ -1,115 +0,0 @@ - - - - - - Coinman 管理 - - - -

Coinman 管理后台

- -
-

Webhook 回调地址

- - -
-
- -
-

添加收款地址

- - -
-
- -
-

当前监听地址

- - -
-
- - - - diff --git a/servers/coinman/cmd/coinman/main.go b/servers/coinman/cmd/coinman/main.go deleted file mode 100644 index e9d07ce..0000000 --- a/servers/coinman/cmd/coinman/main.go +++ /dev/null @@ -1,201 +0,0 @@ -package main - -import ( - "bytes" - "context" - "embed" - "encoding/json" - "log" - "net/http" - "os" - "strings" - - "rnpay/coinman" - - "github.com/redis/go-redis/v9" -) - -const redisKeyCallback = "coinman:callback_url" - -var ( - cm *coinman.Coinman - rdb *redis.Client - ctx = context.Background() -) - -//go:embed admin.html -var adminFS embed.FS - -func main() { - redisAddr := getEnv("REDIS_ADDR", "localhost:6379") - notifyStream := getEnv("NOTIFY_STREAM", "") - port := getEnv("PORT", ":16001") - - rdb = redis.NewClient(&redis.Options{Addr: redisAddr}) - if err := rdb.Ping(ctx).Err(); err != nil { - log.Fatalf("Redis: %v", err) - } - if u := getEnv("CALLBACK_URL", ""); u != "" { - rdb.Set(ctx, redisKeyCallback, u, 0) - } - - cm = coinman.New(rdb) - cm.OnOrderComplete(func(addr string, order coinman.OrderInfo) { - url, _ := rdb.Get(ctx, redisKeyCallback).Result() - if url == "" { - return - } - body, _ := json.Marshal(map[string]interface{}{"event": "usdt.received", "address": addr, "order": order}) - resp, err := http.Post(url, "application/json", bytes.NewReader(body)) - if err != nil { - log.Printf("[coinman] callback err: %v", err) - return - } - resp.Body.Close() - log.Printf("[coinman] callback -> %d", resp.StatusCode) - }) - if notifyStream != "" { - cm.SetNotifyStream(notifyStream) - } - cm.Start() - - http.HandleFunc("/start", cors(handleStart)) - http.HandleFunc("/addresses", cors(handleAddresses)) - http.HandleFunc("/addresses/add", cors(handleAddressesAdd)) - http.HandleFunc("/addresses/remove", cors(handleAddressesRemove)) - http.HandleFunc("/config/callback", cors(handleConfigCallback)) - http.HandleFunc("/admin", func(w http.ResponseWriter, r *http.Request) { - data, _ := adminFS.ReadFile("admin.html") - w.Header().Set("Content-Type", "text/html; charset=utf-8") - w.Write(data) - }) - - log.Printf("coinman listen %s (REDIS=%s, STREAM=%s)", port, redisAddr, boolStr(notifyStream != "")) - log.Fatal(http.ListenAndServe(port, nil)) -} - -func handleConfigCallback(w http.ResponseWriter, r *http.Request) { - if r.Method == http.MethodGet { - url, _ := rdb.Get(ctx, redisKeyCallback).Result() - jsonResp(w, true, "ok", map[string]interface{}{"callback_url": url}) - return - } - if r.Method != http.MethodPost { - jsonResp(w, false, "Method not allowed", nil) - return - } - var req struct { - CallbackURL string `json:"callback_url"` - } - _ = json.NewDecoder(r.Body).Decode(&req) - url := strings.TrimSpace(req.CallbackURL) - if url == "" { - rdb.Del(ctx, redisKeyCallback) - jsonResp(w, true, "ok", map[string]interface{}{"callback_url": ""}) - return - } - rdb.Set(ctx, redisKeyCallback, url, 0) - jsonResp(w, true, "ok", map[string]interface{}{"callback_url": url}) -} - -func getEnv(k, d string) string { - if v := os.Getenv(k); v != "" { - return v - } - return d -} - -func boolStr(b bool) string { - if b { - return "yes" - } - return "no" -} - -func cors(h http.HandlerFunc) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Access-Control-Allow-Origin", "*") - w.Header().Set("Access-Control-Allow-Methods", "GET, POST, DELETE, OPTIONS") - w.Header().Set("Access-Control-Allow-Headers", "Content-Type") - if r.Method == http.MethodOptions { - return - } - h(w, r) - } -} - -func jsonResp(w http.ResponseWriter, ok bool, msg string, data interface{}) { - w.Header().Set("Content-Type", "application/json") - _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": ok, "msg": msg, "data": data}) -} - -func handleStart(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - jsonResp(w, false, "Method not allowed", nil) - return - } - cm.Start() - jsonResp(w, true, "started", nil) -} - -func handleAddresses(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodGet { - jsonResp(w, false, "Method not allowed", nil) - return - } - list := cm.ListPaymentAddresses() - jsonResp(w, true, "ok", map[string]interface{}{"addresses": list}) -} - -func handleAddressesAdd(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - jsonResp(w, false, "Method not allowed", nil) - return - } - var req struct { - Addresses []string `json:"addresses"` - } - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - jsonResp(w, false, "invalid json", nil) - return - } - addrs := trimAddrs(req.Addresses) - if len(addrs) == 0 { - jsonResp(w, false, "addresses required", nil) - return - } - cm.AddPaymentAddress(addrs) - jsonResp(w, true, "ok", map[string]interface{}{"added": addrs}) -} - -func handleAddressesRemove(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - jsonResp(w, false, "Method not allowed", nil) - return - } - var req struct { - Addresses []string `json:"addresses"` - } - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - jsonResp(w, false, "invalid json", nil) - return - } - addrs := trimAddrs(req.Addresses) - if len(addrs) == 0 { - jsonResp(w, false, "addresses required", nil) - return - } - cm.RemovePaymentAddress(addrs) - jsonResp(w, true, "ok", map[string]interface{}{"removed": addrs}) -} - -func trimAddrs(s []string) []string { - out := make([]string, 0, len(s)) - for _, a := range s { - a = strings.TrimSpace(a) - if a != "" { - out = append(out, a) - } - } - return out -} diff --git a/servers/coinman/coinman b/servers/coinman/coinman deleted file mode 100755 index 425e4a4..0000000 Binary files a/servers/coinman/coinman and /dev/null differ diff --git a/servers/coinman/coinman.go b/servers/coinman/coinman.go deleted file mode 100644 index fecdccd..0000000 --- a/servers/coinman/coinman.go +++ /dev/null @@ -1,413 +0,0 @@ -// Package coinman:USDT(TRC20) 多地址收款、轮询回调,支持分布式部署 -package coinman - -import ( - "context" - "encoding/json" - "fmt" - "io" - "net/http" - "sync" - "time" - - "github.com/redis/go-redis/v9" -) - -const ( - tronGridBase = "https://api.trongrid.io" - usdtContract = "TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t" - redisKeyAddrs = "coinman:addresses" - redisKeyLastTs = "coinman:last_ts:%s" - redisKeyNotified = "coinman:notified:%s" // 分布式去重:仅第一个实例回调 - redisKeySeen = "coinman:seen:%s" // 已处理 tx_id 集合,避免重复 - pollDefault = 10 * time.Second -) - -// OrderInfo 单笔到账订单信息 -type OrderInfo struct { - From string `json:"from"` - To string `json:"to"` - Value string `json:"value"` - TxID string `json:"tx_id"` - BlockTs int64 `json:"block_ts"` -} - -// Coinman 收款管理(多地址、分布式) -type Coinman struct { - redis *redis.Client - ctx context.Context - onComplete func(address string, order OrderInfo) - notifyStream string // 可选:到账时 XADD 到此 Stream,供多消费者 - pollInterval time.Duration - mu sync.Mutex - started bool - stopCh chan struct{} - memAddrs map[string]struct{} // redis 为 nil 时使用 -} - -// New 创建 Coinman,redis 为 nil 时仅内存模式(单机) -func New(redisClient *redis.Client) *Coinman { - c := &Coinman{ - redis: redisClient, - ctx: context.Background(), - pollInterval: pollDefault, - stopCh: make(chan struct{}), - memAddrs: make(map[string]struct{}), - } - return c -} - -// Start 开启轮询与回调 -func (c *Coinman) Start() { - c.mu.Lock() - if c.started { - c.mu.Unlock() - return - } - c.started = true - c.mu.Unlock() - go c.pollLoop() -} - -// OnOrderComplete 设置到账回调:每笔新到账调用一次 (收款地址, 订单信息) -func (c *Coinman) OnOrderComplete(fn func(address string, order OrderInfo)) { - c.mu.Lock() - c.onComplete = fn - c.mu.Unlock() -} - -// SetNotifyStream 设置 Redis Stream:到账时 XADD,多消费者可独立消费,无需额外 MQ -func (c *Coinman) SetNotifyStream(stream string) { - c.mu.Lock() - c.notifyStream = stream - c.mu.Unlock() -} - -// AddPaymentAddress 动态添加收款地址(批量) -func (c *Coinman) AddPaymentAddress(addrs []string) { - if len(addrs) == 0 { - return - } - if c.redis != nil { - for _, a := range addrs { - if a != "" { - c.redis.SAdd(c.ctx, redisKeyAddrs, a) - } - } - return - } - c.mu.Lock() - for _, a := range addrs { - if a != "" { - c.memAddrs[a] = struct{}{} - } - } - c.mu.Unlock() -} - -// RemovePaymentAddress 动态删除收款地址(批量) -func (c *Coinman) RemovePaymentAddress(addrs []string) { - if len(addrs) == 0 { - return - } - if c.redis != nil { - c.redis.SRem(c.ctx, redisKeyAddrs, addrs) - return - } - c.mu.Lock() - for _, a := range addrs { - delete(c.memAddrs, a) - } - c.mu.Unlock() -} - -// ListPaymentAddresses 返回当前监听的收款地址列表 -func (c *Coinman) ListPaymentAddresses() []string { - return c.getAddresses() -} - -func (c *Coinman) getAddresses() []string { - if c.redis != nil { - list, _ := c.redis.SMembers(c.ctx, redisKeyAddrs).Result() - return list - } - c.mu.Lock() - defer c.mu.Unlock() - out := make([]string, 0, len(c.memAddrs)) - for a := range c.memAddrs { - out = append(out, a) - } - return out -} - -func (c *Coinman) getLastTs(addr string) int64 { - if c.redis != nil { - s, _ := c.redis.Get(c.ctx, fmt.Sprintf(redisKeyLastTs, addr)).Int64() - return s - } - return 0 // 内存模式在 poll 里用本地 map 存 lastTs,见 pollLoop -} - -func (c *Coinman) setLastTs(addr string, ts int64) { - if c.redis != nil { - c.redis.Set(c.ctx, fmt.Sprintf(redisKeyLastTs, addr), ts, 0) - } -} - -// 分布式去重:仅一个实例能“认领”该 tx 并回调 -func (c *Coinman) claimNotify(txID string) bool { - if c.redis != nil { - key := fmt.Sprintf(redisKeyNotified, txID) - ok, _ := c.redis.SetNX(c.ctx, key, "1", 7*24*time.Hour).Result() - return ok - } - return true -} - -func (c *Coinman) seen(addr, txID string) bool { - if c.redis != nil { - key := fmt.Sprintf(redisKeySeen, addr) - n, _ := c.redis.SAdd(c.ctx, key, txID).Result() - return n == 0 // 已存在 - } - return false -} - -func (c *Coinman) markSeen(addr, txID string) { - if c.redis != nil { - key := fmt.Sprintf(redisKeySeen, addr) - c.redis.SAdd(c.ctx, key, txID) - c.redis.Expire(c.ctx, key, 7*24*time.Hour) - } -} - -// 内存模式下的 lastTs / seen 存在 Coinman 里 -type addrState struct { - lastTs int64 - seen map[string]bool -} -var memState = struct { - sync.RWMutex - m map[string]*addrState -}{m: make(map[string]*addrState)} - -func (c *Coinman) getLastTsMem(addr string) int64 { - memState.RLock() - defer memState.RUnlock() - if s, ok := memState.m[addr]; ok { - return s.lastTs - } - return 0 -} - -func (c *Coinman) setLastTsMem(addr string, ts int64) { - memState.Lock() - defer memState.Unlock() - if memState.m[addr] == nil { - memState.m[addr] = &addrState{seen: make(map[string]bool)} - } - memState.m[addr].lastTs = ts -} - -func (c *Coinman) seenMem(addr, txID string) bool { - memState.Lock() - defer memState.Unlock() - if memState.m[addr] == nil { - memState.m[addr] = &addrState{seen: make(map[string]bool)} - } - _, ok := memState.m[addr].seen[txID] - return ok -} - -func (c *Coinman) markSeenMem(addr, txID string) { - memState.Lock() - defer memState.Unlock() - if memState.m[addr] == nil { - memState.m[addr] = &addrState{seen: make(map[string]bool)} - } - memState.m[addr].seen[txID] = true -} - -func (c *Coinman) publishOrder(stream, addr string, order OrderInfo) { - payload, _ := json.Marshal(map[string]interface{}{"address": addr, "order": order}) - c.redis.XAdd(c.ctx, &redis.XAddArgs{ - Stream: stream, - Values: map[string]interface{}{"address": addr, "data": string(payload)}, - }) -} - -func (c *Coinman) pollLoop() { - ticker := time.NewTicker(c.pollInterval) - defer ticker.Stop() - for { - select { - case <-c.stopCh: - return - case <-ticker.C: - c.pollOnce() - } - } -} - -func (c *Coinman) pollOnce() { - addrs := c.getAddresses() - if len(addrs) == 0 { - return - } - c.mu.Lock() - onComplete := c.onComplete - notifyStream := c.notifyStream - c.mu.Unlock() - if onComplete == nil && notifyStream == "" { - return - } - - for _, addr := range addrs { - var lastTs int64 - if c.redis != nil { - lastTs = c.getLastTs(addr) - } else { - lastTs = c.getLastTsMem(addr) - } - transfers, err := fetchIncoming(addr, lastTs, 50) - if err != nil { - continue - } - var maxTs int64 - for _, t := range transfers { - if !t.Confirmed { - continue - } - seen := c.redis != nil && c.seen(addr, t.TxID) - if !seen && c.redis == nil { - seen = c.seenMem(addr, t.TxID) - } - if seen { - if t.BlockTs > maxTs { - maxTs = t.BlockTs - } - continue - } - if c.redis != nil { - c.markSeen(addr, t.TxID) - } else { - c.markSeenMem(addr, t.TxID) - } - if t.BlockTs > maxTs { - maxTs = t.BlockTs - } - if !c.claimNotify(t.TxID) { - continue - } - order := OrderInfo{From: t.From, To: t.To, Value: t.Value, TxID: t.TxID, BlockTs: t.BlockTs} - if notifyStream != "" && c.redis != nil { - c.publishOrder(notifyStream, addr, order) - } - if onComplete != nil { - onComplete(addr, order) - } - } - if maxTs > 0 { - if c.redis != nil { - c.setLastTs(addr, maxTs) - } else { - c.setLastTsMem(addr, maxTs) - } - } - } -} - -type transfer struct { - From string - To string - Value string - TxID string - BlockTs int64 - Confirmed bool -} - -// FetchIncoming 拉取某地址 TRC20-USDT 转入记录(供 HTTP 查询接口) -func FetchIncoming(address string, sinceTs int64, limit int) ([]OrderInfo, error) { - list, err := fetchIncoming(address, sinceTs, limit) - if err != nil { - return nil, err - } - out := make([]OrderInfo, 0, len(list)) - for _, t := range list { - if t.Confirmed { - out = append(out, OrderInfo{From: t.From, To: t.To, Value: t.Value, TxID: t.TxID, BlockTs: t.BlockTs}) - } - } - return out, nil -} - -// GetTxStatus 查询单笔交易是否成功 -func GetTxStatus(txID string) (confirmed bool, blockTs int64, err error) { - url := fmt.Sprintf("%s/wallet/gettransactionbyid?value=%s", tronGridBase, txID) - req, _ := http.NewRequest("GET", url, nil) - resp, err := http.DefaultClient.Do(req) - if err != nil { - return false, 0, err - } - defer resp.Body.Close() - body, _ := io.ReadAll(resp.Body) - var raw struct { - Ret []struct{ ContractRet string } `json:"ret"` - BlockTimestamp int64 `json:"block_timestamp"` - } - if err := json.Unmarshal(body, &raw); err != nil { - return false, 0, err - } - if len(raw.Ret) > 0 { - confirmed = raw.Ret[0].ContractRet == "SUCCESS" - } - return confirmed, raw.BlockTimestamp, nil -} - -func fetchIncoming(address string, sinceTs int64, limit int) ([]transfer, error) { - url := fmt.Sprintf("%s/v1/accounts/%s/transactions/trc20?only_to=true&contract_address=%s&limit=%d", - tronGridBase, address, usdtContract, limit) - if sinceTs > 0 { - url += "&min_timestamp=" + fmt.Sprintf("%d", sinceTs) - } - req, _ := http.NewRequest("GET", url, nil) - req.Header.Set("Accept", "application/json") - resp, err := http.DefaultClient.Do(req) - if err != nil { - return nil, err - } - defer resp.Body.Close() - body, _ := io.ReadAll(resp.Body) - var raw struct { - Data []struct { - TxID string `json:"transaction_id"` - BlockTs int64 `json:"block_timestamp"` - From string `json:"from"` - To string `json:"to"` - Value string `json:"value"` - ContractRet string `json:"contract_ret"` - } `json:"data"` - Success bool `json:"success"` - } - if err := json.Unmarshal(body, &raw); err != nil || !raw.Success { - return nil, fmt.Errorf("trongrid api err") - } - list := make([]transfer, 0, len(raw.Data)) - for _, d := range raw.Data { - val := d.Value - if len(val) > 6 { - val = val[:len(val)-6] + "." + val[len(val)-6:] - } else { - val = "0." + val - } - list = append(list, transfer{ - From: d.From, - To: d.To, - Value: val, - TxID: d.TxID, - BlockTs: d.BlockTs, - Confirmed: d.ContractRet == "SUCCESS", - }) - } - return list, nil -} \ No newline at end of file diff --git a/servers/coinman/go.mod b/servers/coinman/go.mod deleted file mode 100644 index dcb07f7..0000000 --- a/servers/coinman/go.mod +++ /dev/null @@ -1,10 +0,0 @@ -module rnpay/coinman - -go 1.21 - -require github.com/redis/go-redis/v9 v9.5.1 - -require ( - github.com/cespare/xxhash/v2 v2.2.0 // indirect - github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect -) diff --git a/servers/coinman/go.sum b/servers/coinman/go.sum deleted file mode 100644 index a341487..0000000 --- a/servers/coinman/go.sum +++ /dev/null @@ -1,10 +0,0 @@ -github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= -github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= -github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= -github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= -github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= -github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= -github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= -github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8= -github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= diff --git a/servers/coinman/main b/servers/coinman/main deleted file mode 100755 index 109674d..0000000 Binary files a/servers/coinman/main and /dev/null differ diff --git a/servers/usdtman b/servers/usdtman index f8d24bf..3d3e814 160000 --- a/servers/usdtman +++ b/servers/usdtman @@ -1 +1 @@ -Subproject commit f8d24bf7882fb2443359a8f464ac2eab2572f3d0 +Subproject commit 3d3e8148790660ce94dd4b146b0a7950e9b211f3 diff --git a/servers/walletman b/servers/walletman index 5f4e043..2882cca 160000 --- a/servers/walletman +++ b/servers/walletman @@ -1 +1 @@ -Subproject commit 5f4e0434b75f0433cb584ccd0c095dfa11e2a803 +Subproject commit 2882ccaffb216cbdd852e6b6d2d72de8e95c31ba