417 lines
12 KiB
Go
417 lines
12 KiB
Go
package dao
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"strings"
|
|
|
|
"go-common/app/service/openplatform/ticket-sales/model"
|
|
"go-common/app/service/openplatform/ticket-sales/model/consts"
|
|
"go-common/library/cache/redis"
|
|
"go-common/library/log"
|
|
"go-common/library/xstr"
|
|
)
|
|
|
|
//票号相关常量
|
|
const (
|
|
sqlCountRefundTicket = "SELECT COUNT(*) FROM ticket WHERE oid IN (%s) AND refund_status=?"
|
|
|
|
_selectTicketByOrderIDSQL = "SELECT id, uid, oid, sid, price, src, type, status, qr, ref_id, sku_id, seat_id, seat, etime, refund_apply_time, ctime, mtime FROM ticket WHERE oid=?"
|
|
_selectTicketByScreenIDSQL = "SELECT id, uid, oid, sid, price, src, type, status, qr, ref_id, sku_id, seat_id, seat, etime, refund_apply_time, ctime, mtime FROM ticket WHERE sid=? AND uid=? AND type != ?"
|
|
_selectTicketByIDSQL = "SELECT id, uid, oid, sid, price, src, type, status, qr, ref_id, sku_id, seat_id, seat, etime, refund_apply_time, ctime, mtime FROM ticket WHERE id IN (%s)"
|
|
_updateTicketStatusSQL = "UPDATE ticket set status=? WHERE id IN (%s)"
|
|
_selectTicketSendBySendTID = "SELECT id, sid, send_tid, recv_tid, send_uid, recv_uid, recv_tel, status, ctime, mtime, oid FROM ticket_send WHERE send_tid IN (%s)"
|
|
_selectTicketSendByRecvTID = "SELECT id, sid, send_tid, recv_tid, send_uid, recv_uid, recv_tel, status, ctime, mtime, oid FROM ticket_send WHERE recv_tid IN (%s)"
|
|
)
|
|
|
|
//rawRefundTicketCnt 统计用户已退票数
|
|
func (d *Dao) rawRefundTicketCnt(ctx context.Context, oids []int64) (cnt int64, err error) {
|
|
lo := len(oids)
|
|
if lo == 0 {
|
|
return
|
|
}
|
|
q := fmt.Sprintf(sqlCountRefundTicket, strings.Repeat(",?", lo)[1:])
|
|
a := make([]interface{}, lo+1)
|
|
a[lo] = consts.TkStatusRefunded
|
|
for k, v := range oids {
|
|
a[k] = v
|
|
}
|
|
err = d.db.QueryRow(ctx, q, a...).Scan(&cnt)
|
|
return
|
|
}
|
|
|
|
// CacheTicketsByOrderID 通过 order_id 获取 tickets 取缓存
|
|
func (d *Dao) CacheTicketsByOrderID(c context.Context, orderID int64) (res []*model.Ticket, err error) {
|
|
conn := d.redis.Get(c)
|
|
defer conn.Close()
|
|
|
|
reply, err := redis.Bytes(conn.Do("GET", fmt.Sprintf(model.CacheKeyOrderTickets, orderID)))
|
|
if err != nil {
|
|
if err == redis.ErrNil {
|
|
err = nil
|
|
return
|
|
}
|
|
log.Error("d.CacheTicketsByOrderID(%d) error(%v)", orderID, err)
|
|
return
|
|
}
|
|
|
|
if err = json.Unmarshal(reply, &res); err != nil {
|
|
log.Error("d.CacheTicketsByOrderID(%d) json.Unmarshal() error(%v)", orderID, err)
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
// RawTicketsByOrderID 通过 order_id 获取 tickets
|
|
func (d *Dao) RawTicketsByOrderID(c context.Context, orderID int64) (res []*model.Ticket, err error) {
|
|
rows, err := d.db.Query(c, _selectTicketByOrderIDSQL, orderID)
|
|
if err != nil {
|
|
log.Error("d.TicketsByOrderID(%v) d.db.Query() error(%v)", orderID, err)
|
|
return
|
|
}
|
|
defer rows.Close()
|
|
for rows.Next() {
|
|
ticket := &model.Ticket{}
|
|
if err = rows.Scan(&ticket.ID, &ticket.UID, &ticket.OID, &ticket.SID, &ticket.Price, &ticket.Src, &ticket.Type,
|
|
&ticket.Status, &ticket.Qr, &ticket.RefID, &ticket.SkuID, &ticket.SeatID,
|
|
&ticket.Seat, &ticket.ETime, &ticket.RefundApplyTime, &ticket.CTime, &ticket.MTime); err != nil {
|
|
log.Error("d.TicketsByOrderID(%v) rows.Scan() error(%v)", orderID, err)
|
|
return
|
|
}
|
|
res = append(res, ticket)
|
|
}
|
|
return
|
|
}
|
|
|
|
// AddCacheTicketsByOrderID 通过 order_id 获取 tickets 写缓存
|
|
func (d *Dao) AddCacheTicketsByOrderID(c context.Context, orderID int64, tickets []*model.Ticket) (err error) {
|
|
conn := d.redis.Get(c)
|
|
defer conn.Close()
|
|
|
|
val, err := json.Marshal(tickets)
|
|
if err != nil {
|
|
log.Error("d.AddCacheTicketsByScreen() error(%v)", err)
|
|
return
|
|
}
|
|
if _, err = conn.Do("SETEX", fmt.Sprintf(model.CacheKeyOrderTickets, orderID), model.RedisExpireOneDayTmp, val); err != nil {
|
|
log.Error("d.AddCacheTicketsByOrderID(%d, %+v) error(%v)", orderID, tickets, err)
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
// CacheTicketsByScreen 通过 screen_id user_id 获取 tickets
|
|
func (d *Dao) CacheTicketsByScreen(c context.Context, screenID int64, UID int64) (res []*model.Ticket, err error) {
|
|
conn := d.redis.Get(c)
|
|
defer conn.Close()
|
|
|
|
reply, err := redis.Bytes(conn.Do("GET", fmt.Sprintf(model.CacheKeyScreenTickets, screenID, UID)))
|
|
if err != nil {
|
|
if err == redis.ErrNil {
|
|
err = nil
|
|
return
|
|
}
|
|
log.Error("d.CacheTicketsByScreen(%d, %d) error(%v)", screenID, UID, err)
|
|
return
|
|
}
|
|
|
|
if err = json.Unmarshal(reply, &res); err != nil {
|
|
log.Error("d.CacheTicketsByScreen(%d, %d) json.Unmarshal() error(%v)", screenID, UID, err)
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
// RawTicketsByScreen 通过 screen_id user_id 获取 tickets
|
|
func (d *Dao) RawTicketsByScreen(c context.Context, screenID int64, UID int64) (res []*model.Ticket, err error) {
|
|
rows, err := d.db.Query(c, _selectTicketByScreenIDSQL, screenID, UID, consts.TkTypeDistrib)
|
|
if err != nil {
|
|
log.Error("d.RawTicketsByScreen(%d, %d) error(%v)", screenID, UID, err)
|
|
return
|
|
}
|
|
defer rows.Close()
|
|
for rows.Next() {
|
|
ticket := &model.Ticket{}
|
|
if err = rows.Scan(&ticket.ID, &ticket.UID, &ticket.OID, &ticket.SID, &ticket.Price, &ticket.Src, &ticket.Type,
|
|
&ticket.Status, &ticket.Qr, &ticket.RefID, &ticket.SkuID, &ticket.SeatID,
|
|
&ticket.Seat, &ticket.ETime, &ticket.RefundApplyTime, &ticket.CTime, &ticket.MTime); err != nil {
|
|
log.Error("d.RawTicketsByScreen(%d, %d) rows.Scan() error(%v)", screenID, UID, err)
|
|
return
|
|
}
|
|
res = append(res, ticket)
|
|
}
|
|
return
|
|
}
|
|
|
|
// AddCacheTicketsByScreen 通过 screen_id user_id 获取 tickets
|
|
func (d *Dao) AddCacheTicketsByScreen(c context.Context, screenID int64, tickets []*model.Ticket, UID int64) (err error) {
|
|
conn := d.redis.Get(c)
|
|
defer conn.Close()
|
|
|
|
val, err := json.Marshal(tickets)
|
|
if err != nil {
|
|
log.Error("d.AddCacheTicketsByScreen() error(%v)", err)
|
|
return
|
|
}
|
|
if _, err = conn.Do("SETEX", fmt.Sprintf(model.CacheKeyScreenTickets, screenID, UID), model.RedisExpireOneDayTmp, val); err != nil {
|
|
log.Error("d.AddCacheTicketsByScreen(%d, %d, %+v) error(%v)", screenID, UID, tickets, err)
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
// CacheTicketsByID .
|
|
func (d *Dao) CacheTicketsByID(c context.Context, ticketID []int64) (res map[int64]*model.Ticket, err error) {
|
|
if len(ticketID) == 0 {
|
|
return
|
|
}
|
|
conn := d.redis.Get(c)
|
|
defer conn.Close()
|
|
|
|
keys := make([]interface{}, 0)
|
|
for _, ID := range ticketID {
|
|
keys = append(keys, fmt.Sprintf(model.CacheKeyTicket, ID))
|
|
}
|
|
|
|
reply, err := redis.ByteSlices(conn.Do("MGET", keys...))
|
|
if err != nil {
|
|
if err == redis.ErrNil {
|
|
err = nil
|
|
return
|
|
}
|
|
log.Error("d.CacheTicketsByID(%v) conn.Do() error(%v)", ticketID, err)
|
|
return
|
|
}
|
|
|
|
res = make(map[int64]*model.Ticket)
|
|
for _, item := range reply {
|
|
if len(item) == 0 {
|
|
continue
|
|
}
|
|
ticket := &model.Ticket{}
|
|
if err = json.Unmarshal(item, ticket); err != nil {
|
|
log.Error("d.CacheTicketsByID(%v) json.Unmarshal(%s) error(%v)", ticketID, item, err)
|
|
continue
|
|
}
|
|
res[ticket.ID] = ticket
|
|
}
|
|
return
|
|
}
|
|
|
|
// RawTicketsByID .
|
|
func (d *Dao) RawTicketsByID(c context.Context, ticketID []int64) (res map[int64]*model.Ticket, err error) {
|
|
if len(ticketID) == 0 {
|
|
return
|
|
}
|
|
|
|
rows, err := d.db.Query(c, fmt.Sprintf(_selectTicketByIDSQL, xstr.JoinInts(ticketID)))
|
|
if err != nil {
|
|
log.Error("d.RawTicketsByID(%v) d.db.Query() error(%v)", ticketID, err)
|
|
return
|
|
}
|
|
defer rows.Close()
|
|
res = make(map[int64]*model.Ticket)
|
|
for rows.Next() {
|
|
ticket := &model.Ticket{}
|
|
if err = rows.Scan(&ticket.ID, &ticket.UID, &ticket.OID, &ticket.SID, &ticket.Price, &ticket.Src, &ticket.Type,
|
|
&ticket.Status, &ticket.Qr, &ticket.RefID, &ticket.SkuID, &ticket.SeatID,
|
|
&ticket.Seat, &ticket.ETime, &ticket.RefundApplyTime, &ticket.CTime, &ticket.MTime); err != nil {
|
|
log.Error("d.RawTicketsByID(%v) rows.Scan() error(%v)", ticketID, err)
|
|
return
|
|
}
|
|
res[ticket.ID] = ticket
|
|
}
|
|
return
|
|
}
|
|
|
|
// AddCacheTicketsByID .
|
|
func (d *Dao) AddCacheTicketsByID(c context.Context, tickets map[int64]*model.Ticket) (err error) {
|
|
if len(tickets) == 0 {
|
|
return
|
|
}
|
|
conn := d.redis.Get(c)
|
|
defer func() {
|
|
conn.Flush()
|
|
conn.Close()
|
|
}()
|
|
|
|
args := make([]interface{}, 0)
|
|
for ID, ticket := range tickets {
|
|
var b []byte
|
|
if b, err = json.Marshal(ticket); err != nil {
|
|
log.Error("d.AddCacheTicketsByID(%+v) json.Marshal(%+v) error(%v)", tickets, ticket, err)
|
|
continue
|
|
}
|
|
args = append(args, fmt.Sprintf(model.CacheKeyTicket, ID), b)
|
|
}
|
|
|
|
if err = conn.Send("MSET", args...); err != nil {
|
|
log.Error("d.AddCacheTicketsByID(%+v) conn.Send() error(%v)", tickets, err)
|
|
return
|
|
}
|
|
for ID := range tickets {
|
|
conn.Send("EXPIRE", fmt.Sprintf(model.CacheKeyTicket, ID), model.RedisExpireTenMinTmp)
|
|
}
|
|
return
|
|
}
|
|
|
|
// UpdateTicketStatus 更新票状态
|
|
func (d *Dao) UpdateTicketStatus(c context.Context, status int16, ticketID ...int64) (err error) {
|
|
if len(ticketID) == 0 {
|
|
return
|
|
}
|
|
if _, err = d.db.Exec(c, fmt.Sprintf(_updateTicketStatusSQL, xstr.JoinInts(ticketID)), status); err != nil {
|
|
log.Error("d.UpdateTicketStatus(%d, %v) error(%v)", status, ticketID, err)
|
|
}
|
|
return
|
|
}
|
|
|
|
// DelTicketCache 删除单张电子票全部 cache
|
|
func (d *Dao) DelTicketCache(c context.Context, tickets ...*model.Ticket) (err error) {
|
|
if len(tickets) == 0 {
|
|
return
|
|
}
|
|
var keys []interface{}
|
|
for _, ticket := range tickets {
|
|
keys = append(
|
|
keys,
|
|
fmt.Sprintf(model.CacheKeyOrderTickets, ticket.OID),
|
|
fmt.Sprintf(model.CacheKeyScreenTickets, ticket.SID, ticket.UID),
|
|
fmt.Sprintf(model.CacheKeyTicket, ticket.ID),
|
|
fmt.Sprintf(model.CacheKeyTicketQr, ticket.Qr),
|
|
)
|
|
}
|
|
if err = d.RedisDel(c, keys...); err != nil {
|
|
log.Error("d.DelTicketCache() d.RedisDel(%v) error(%v)", keys, err)
|
|
}
|
|
return
|
|
}
|
|
|
|
// CacheTicketSend .
|
|
func (d *Dao) CacheTicketSend(c context.Context, IDs []int64, TIDType string) (res map[int64]*model.TicketSend, err error) {
|
|
var cacheKey string
|
|
switch TIDType {
|
|
case consts.TIDTypeSend:
|
|
cacheKey = model.CacheKeyTicketSend
|
|
case consts.TIDTypeRecv:
|
|
cacheKey = model.CacheKeyTicketRecv
|
|
default:
|
|
return
|
|
}
|
|
conn := d.redis.Get(c)
|
|
defer conn.Close()
|
|
|
|
keys := make([]interface{}, 0)
|
|
for _, ID := range IDs {
|
|
keys = append(keys, fmt.Sprintf(cacheKey, ID))
|
|
}
|
|
reply, err := redis.ByteSlices(conn.Do("MGET", keys...))
|
|
if err != nil {
|
|
if err == redis.ErrNil {
|
|
err = nil
|
|
return
|
|
}
|
|
log.Error("d.CacheTicketSend(%v, %s) conn.Do() error(%v)", IDs, TIDType, err)
|
|
return
|
|
}
|
|
|
|
res = make(map[int64]*model.TicketSend)
|
|
for _, item := range reply {
|
|
if len(item) == 0 {
|
|
continue
|
|
}
|
|
tmp := &model.TicketSend{}
|
|
if err = json.Unmarshal(item, tmp); err != nil {
|
|
log.Error("d.CacheTicketSend() json.Unmarshal(%s) error(%v)", item, err)
|
|
continue
|
|
}
|
|
switch TIDType {
|
|
case consts.TIDTypeSend:
|
|
res[tmp.SendTID] = tmp
|
|
case consts.TIDTypeRecv:
|
|
res[tmp.RecvTID] = tmp
|
|
default:
|
|
return
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// RawTicketSend .
|
|
func (d *Dao) RawTicketSend(c context.Context, IDs []int64, TIDType string) (res map[int64]*model.TicketSend, err error) {
|
|
if len(IDs) == 0 {
|
|
return
|
|
}
|
|
var sql string
|
|
switch TIDType {
|
|
case consts.TIDTypeSend:
|
|
sql = _selectTicketSendBySendTID
|
|
case consts.TIDTypeRecv:
|
|
sql = _selectTicketSendByRecvTID
|
|
default:
|
|
return
|
|
}
|
|
rows, err := d.db.Query(c, fmt.Sprintf(sql, xstr.JoinInts(IDs)))
|
|
if err != nil {
|
|
log.Error("d.RawTicketSend(%v, %s) d.db.Query() error(%v)", IDs, TIDType, err)
|
|
return
|
|
}
|
|
defer rows.Close()
|
|
res = make(map[int64]*model.TicketSend)
|
|
for rows.Next() {
|
|
tmp := &model.TicketSend{}
|
|
if err = rows.Scan(&tmp.ID, &tmp.SID, &tmp.SendTID, &tmp.RecvTID, &tmp.SendUID, &tmp.RecvUID, &tmp.RecvTel, &tmp.Status, &tmp.CTime, &tmp.MTime, &tmp.OID); err != nil {
|
|
log.Error("d.RawTicketSend(%v, %s) rows.Scan() error(%v)", IDs, TIDType, err)
|
|
return
|
|
}
|
|
switch TIDType {
|
|
case consts.TIDTypeSend:
|
|
res[tmp.SendTID] = tmp
|
|
case consts.TIDTypeRecv:
|
|
res[tmp.RecvTID] = tmp
|
|
default:
|
|
return
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// AddCacheTicketSend .
|
|
func (d *Dao) AddCacheTicketSend(c context.Context, tsMap map[int64]*model.TicketSend, TIDType string) (err error) {
|
|
var cacheKey string
|
|
switch TIDType {
|
|
case consts.TIDTypeSend:
|
|
cacheKey = model.CacheKeyTicketSend
|
|
case consts.TIDTypeRecv:
|
|
cacheKey = model.CacheKeyTicketRecv
|
|
default:
|
|
return
|
|
}
|
|
|
|
conn := d.redis.Get(c)
|
|
defer func() {
|
|
conn.Flush()
|
|
conn.Close()
|
|
}()
|
|
|
|
var args []interface{}
|
|
for ID, item := range tsMap {
|
|
var b []byte
|
|
if b, err = json.Marshal(item); err != nil {
|
|
log.Error("d.AddCacheTicketSend(%v, %s), json.Marshal(%s) error(%v)", tsMap, TIDType, b, err)
|
|
continue
|
|
}
|
|
args = append(args, fmt.Sprintf(cacheKey, ID), b)
|
|
}
|
|
|
|
if err = conn.Send("MSET", args...); err != nil {
|
|
log.Error("d.AddCacheTicketsByID(%+v) conn.Send() error(%v)", tsMap, err)
|
|
return
|
|
}
|
|
for ID := range tsMap {
|
|
conn.Send("EXPIRE", fmt.Sprintf(cacheKey, ID), model.RedisExpireOneDayTmp)
|
|
}
|
|
return
|
|
}
|