2019-04-22 02:59:20 +00:00

417 lines
12 KiB
Go

package dao
import (
"context"
"encoding/json"
"fmt"
"strings"
"go-common/app/service/openplatform/ticket-sales/model"
"go-common/app/service/openplatform/ticket-sales/model/consts"
"go-common/library/cache/redis"
"go-common/library/log"
"go-common/library/xstr"
)
//票号相关常量
const (
sqlCountRefundTicket = "SELECT COUNT(*) FROM ticket WHERE oid IN (%s) AND refund_status=?"
_selectTicketByOrderIDSQL = "SELECT id, uid, oid, sid, price, src, type, status, qr, ref_id, sku_id, seat_id, seat, etime, refund_apply_time, ctime, mtime FROM ticket WHERE oid=?"
_selectTicketByScreenIDSQL = "SELECT id, uid, oid, sid, price, src, type, status, qr, ref_id, sku_id, seat_id, seat, etime, refund_apply_time, ctime, mtime FROM ticket WHERE sid=? AND uid=? AND type != ?"
_selectTicketByIDSQL = "SELECT id, uid, oid, sid, price, src, type, status, qr, ref_id, sku_id, seat_id, seat, etime, refund_apply_time, ctime, mtime FROM ticket WHERE id IN (%s)"
_updateTicketStatusSQL = "UPDATE ticket set status=? WHERE id IN (%s)"
_selectTicketSendBySendTID = "SELECT id, sid, send_tid, recv_tid, send_uid, recv_uid, recv_tel, status, ctime, mtime, oid FROM ticket_send WHERE send_tid IN (%s)"
_selectTicketSendByRecvTID = "SELECT id, sid, send_tid, recv_tid, send_uid, recv_uid, recv_tel, status, ctime, mtime, oid FROM ticket_send WHERE recv_tid IN (%s)"
)
//rawRefundTicketCnt 统计用户已退票数
func (d *Dao) rawRefundTicketCnt(ctx context.Context, oids []int64) (cnt int64, err error) {
lo := len(oids)
if lo == 0 {
return
}
q := fmt.Sprintf(sqlCountRefundTicket, strings.Repeat(",?", lo)[1:])
a := make([]interface{}, lo+1)
a[lo] = consts.TkStatusRefunded
for k, v := range oids {
a[k] = v
}
err = d.db.QueryRow(ctx, q, a...).Scan(&cnt)
return
}
// CacheTicketsByOrderID 通过 order_id 获取 tickets 取缓存
func (d *Dao) CacheTicketsByOrderID(c context.Context, orderID int64) (res []*model.Ticket, err error) {
conn := d.redis.Get(c)
defer conn.Close()
reply, err := redis.Bytes(conn.Do("GET", fmt.Sprintf(model.CacheKeyOrderTickets, orderID)))
if err != nil {
if err == redis.ErrNil {
err = nil
return
}
log.Error("d.CacheTicketsByOrderID(%d) error(%v)", orderID, err)
return
}
if err = json.Unmarshal(reply, &res); err != nil {
log.Error("d.CacheTicketsByOrderID(%d) json.Unmarshal() error(%v)", orderID, err)
return
}
return
}
// RawTicketsByOrderID 通过 order_id 获取 tickets
func (d *Dao) RawTicketsByOrderID(c context.Context, orderID int64) (res []*model.Ticket, err error) {
rows, err := d.db.Query(c, _selectTicketByOrderIDSQL, orderID)
if err != nil {
log.Error("d.TicketsByOrderID(%v) d.db.Query() error(%v)", orderID, err)
return
}
defer rows.Close()
for rows.Next() {
ticket := &model.Ticket{}
if err = rows.Scan(&ticket.ID, &ticket.UID, &ticket.OID, &ticket.SID, &ticket.Price, &ticket.Src, &ticket.Type,
&ticket.Status, &ticket.Qr, &ticket.RefID, &ticket.SkuID, &ticket.SeatID,
&ticket.Seat, &ticket.ETime, &ticket.RefundApplyTime, &ticket.CTime, &ticket.MTime); err != nil {
log.Error("d.TicketsByOrderID(%v) rows.Scan() error(%v)", orderID, err)
return
}
res = append(res, ticket)
}
return
}
// AddCacheTicketsByOrderID 通过 order_id 获取 tickets 写缓存
func (d *Dao) AddCacheTicketsByOrderID(c context.Context, orderID int64, tickets []*model.Ticket) (err error) {
conn := d.redis.Get(c)
defer conn.Close()
val, err := json.Marshal(tickets)
if err != nil {
log.Error("d.AddCacheTicketsByScreen() error(%v)", err)
return
}
if _, err = conn.Do("SETEX", fmt.Sprintf(model.CacheKeyOrderTickets, orderID), model.RedisExpireOneDayTmp, val); err != nil {
log.Error("d.AddCacheTicketsByOrderID(%d, %+v) error(%v)", orderID, tickets, err)
return
}
return
}
// CacheTicketsByScreen 通过 screen_id user_id 获取 tickets
func (d *Dao) CacheTicketsByScreen(c context.Context, screenID int64, UID int64) (res []*model.Ticket, err error) {
conn := d.redis.Get(c)
defer conn.Close()
reply, err := redis.Bytes(conn.Do("GET", fmt.Sprintf(model.CacheKeyScreenTickets, screenID, UID)))
if err != nil {
if err == redis.ErrNil {
err = nil
return
}
log.Error("d.CacheTicketsByScreen(%d, %d) error(%v)", screenID, UID, err)
return
}
if err = json.Unmarshal(reply, &res); err != nil {
log.Error("d.CacheTicketsByScreen(%d, %d) json.Unmarshal() error(%v)", screenID, UID, err)
return
}
return
}
// RawTicketsByScreen 通过 screen_id user_id 获取 tickets
func (d *Dao) RawTicketsByScreen(c context.Context, screenID int64, UID int64) (res []*model.Ticket, err error) {
rows, err := d.db.Query(c, _selectTicketByScreenIDSQL, screenID, UID, consts.TkTypeDistrib)
if err != nil {
log.Error("d.RawTicketsByScreen(%d, %d) error(%v)", screenID, UID, err)
return
}
defer rows.Close()
for rows.Next() {
ticket := &model.Ticket{}
if err = rows.Scan(&ticket.ID, &ticket.UID, &ticket.OID, &ticket.SID, &ticket.Price, &ticket.Src, &ticket.Type,
&ticket.Status, &ticket.Qr, &ticket.RefID, &ticket.SkuID, &ticket.SeatID,
&ticket.Seat, &ticket.ETime, &ticket.RefundApplyTime, &ticket.CTime, &ticket.MTime); err != nil {
log.Error("d.RawTicketsByScreen(%d, %d) rows.Scan() error(%v)", screenID, UID, err)
return
}
res = append(res, ticket)
}
return
}
// AddCacheTicketsByScreen 通过 screen_id user_id 获取 tickets
func (d *Dao) AddCacheTicketsByScreen(c context.Context, screenID int64, tickets []*model.Ticket, UID int64) (err error) {
conn := d.redis.Get(c)
defer conn.Close()
val, err := json.Marshal(tickets)
if err != nil {
log.Error("d.AddCacheTicketsByScreen() error(%v)", err)
return
}
if _, err = conn.Do("SETEX", fmt.Sprintf(model.CacheKeyScreenTickets, screenID, UID), model.RedisExpireOneDayTmp, val); err != nil {
log.Error("d.AddCacheTicketsByScreen(%d, %d, %+v) error(%v)", screenID, UID, tickets, err)
return
}
return
}
// CacheTicketsByID .
func (d *Dao) CacheTicketsByID(c context.Context, ticketID []int64) (res map[int64]*model.Ticket, err error) {
if len(ticketID) == 0 {
return
}
conn := d.redis.Get(c)
defer conn.Close()
keys := make([]interface{}, 0)
for _, ID := range ticketID {
keys = append(keys, fmt.Sprintf(model.CacheKeyTicket, ID))
}
reply, err := redis.ByteSlices(conn.Do("MGET", keys...))
if err != nil {
if err == redis.ErrNil {
err = nil
return
}
log.Error("d.CacheTicketsByID(%v) conn.Do() error(%v)", ticketID, err)
return
}
res = make(map[int64]*model.Ticket)
for _, item := range reply {
if len(item) == 0 {
continue
}
ticket := &model.Ticket{}
if err = json.Unmarshal(item, ticket); err != nil {
log.Error("d.CacheTicketsByID(%v) json.Unmarshal(%s) error(%v)", ticketID, item, err)
continue
}
res[ticket.ID] = ticket
}
return
}
// RawTicketsByID .
func (d *Dao) RawTicketsByID(c context.Context, ticketID []int64) (res map[int64]*model.Ticket, err error) {
if len(ticketID) == 0 {
return
}
rows, err := d.db.Query(c, fmt.Sprintf(_selectTicketByIDSQL, xstr.JoinInts(ticketID)))
if err != nil {
log.Error("d.RawTicketsByID(%v) d.db.Query() error(%v)", ticketID, err)
return
}
defer rows.Close()
res = make(map[int64]*model.Ticket)
for rows.Next() {
ticket := &model.Ticket{}
if err = rows.Scan(&ticket.ID, &ticket.UID, &ticket.OID, &ticket.SID, &ticket.Price, &ticket.Src, &ticket.Type,
&ticket.Status, &ticket.Qr, &ticket.RefID, &ticket.SkuID, &ticket.SeatID,
&ticket.Seat, &ticket.ETime, &ticket.RefundApplyTime, &ticket.CTime, &ticket.MTime); err != nil {
log.Error("d.RawTicketsByID(%v) rows.Scan() error(%v)", ticketID, err)
return
}
res[ticket.ID] = ticket
}
return
}
// AddCacheTicketsByID .
func (d *Dao) AddCacheTicketsByID(c context.Context, tickets map[int64]*model.Ticket) (err error) {
if len(tickets) == 0 {
return
}
conn := d.redis.Get(c)
defer func() {
conn.Flush()
conn.Close()
}()
args := make([]interface{}, 0)
for ID, ticket := range tickets {
var b []byte
if b, err = json.Marshal(ticket); err != nil {
log.Error("d.AddCacheTicketsByID(%+v) json.Marshal(%+v) error(%v)", tickets, ticket, err)
continue
}
args = append(args, fmt.Sprintf(model.CacheKeyTicket, ID), b)
}
if err = conn.Send("MSET", args...); err != nil {
log.Error("d.AddCacheTicketsByID(%+v) conn.Send() error(%v)", tickets, err)
return
}
for ID := range tickets {
conn.Send("EXPIRE", fmt.Sprintf(model.CacheKeyTicket, ID), model.RedisExpireTenMinTmp)
}
return
}
// UpdateTicketStatus 更新票状态
func (d *Dao) UpdateTicketStatus(c context.Context, status int16, ticketID ...int64) (err error) {
if len(ticketID) == 0 {
return
}
if _, err = d.db.Exec(c, fmt.Sprintf(_updateTicketStatusSQL, xstr.JoinInts(ticketID)), status); err != nil {
log.Error("d.UpdateTicketStatus(%d, %v) error(%v)", status, ticketID, err)
}
return
}
// DelTicketCache 删除单张电子票全部 cache
func (d *Dao) DelTicketCache(c context.Context, tickets ...*model.Ticket) (err error) {
if len(tickets) == 0 {
return
}
var keys []interface{}
for _, ticket := range tickets {
keys = append(
keys,
fmt.Sprintf(model.CacheKeyOrderTickets, ticket.OID),
fmt.Sprintf(model.CacheKeyScreenTickets, ticket.SID, ticket.UID),
fmt.Sprintf(model.CacheKeyTicket, ticket.ID),
fmt.Sprintf(model.CacheKeyTicketQr, ticket.Qr),
)
}
if err = d.RedisDel(c, keys...); err != nil {
log.Error("d.DelTicketCache() d.RedisDel(%v) error(%v)", keys, err)
}
return
}
// CacheTicketSend .
func (d *Dao) CacheTicketSend(c context.Context, IDs []int64, TIDType string) (res map[int64]*model.TicketSend, err error) {
var cacheKey string
switch TIDType {
case consts.TIDTypeSend:
cacheKey = model.CacheKeyTicketSend
case consts.TIDTypeRecv:
cacheKey = model.CacheKeyTicketRecv
default:
return
}
conn := d.redis.Get(c)
defer conn.Close()
keys := make([]interface{}, 0)
for _, ID := range IDs {
keys = append(keys, fmt.Sprintf(cacheKey, ID))
}
reply, err := redis.ByteSlices(conn.Do("MGET", keys...))
if err != nil {
if err == redis.ErrNil {
err = nil
return
}
log.Error("d.CacheTicketSend(%v, %s) conn.Do() error(%v)", IDs, TIDType, err)
return
}
res = make(map[int64]*model.TicketSend)
for _, item := range reply {
if len(item) == 0 {
continue
}
tmp := &model.TicketSend{}
if err = json.Unmarshal(item, tmp); err != nil {
log.Error("d.CacheTicketSend() json.Unmarshal(%s) error(%v)", item, err)
continue
}
switch TIDType {
case consts.TIDTypeSend:
res[tmp.SendTID] = tmp
case consts.TIDTypeRecv:
res[tmp.RecvTID] = tmp
default:
return
}
}
return
}
// RawTicketSend .
func (d *Dao) RawTicketSend(c context.Context, IDs []int64, TIDType string) (res map[int64]*model.TicketSend, err error) {
if len(IDs) == 0 {
return
}
var sql string
switch TIDType {
case consts.TIDTypeSend:
sql = _selectTicketSendBySendTID
case consts.TIDTypeRecv:
sql = _selectTicketSendByRecvTID
default:
return
}
rows, err := d.db.Query(c, fmt.Sprintf(sql, xstr.JoinInts(IDs)))
if err != nil {
log.Error("d.RawTicketSend(%v, %s) d.db.Query() error(%v)", IDs, TIDType, err)
return
}
defer rows.Close()
res = make(map[int64]*model.TicketSend)
for rows.Next() {
tmp := &model.TicketSend{}
if err = rows.Scan(&tmp.ID, &tmp.SID, &tmp.SendTID, &tmp.RecvTID, &tmp.SendUID, &tmp.RecvUID, &tmp.RecvTel, &tmp.Status, &tmp.CTime, &tmp.MTime, &tmp.OID); err != nil {
log.Error("d.RawTicketSend(%v, %s) rows.Scan() error(%v)", IDs, TIDType, err)
return
}
switch TIDType {
case consts.TIDTypeSend:
res[tmp.SendTID] = tmp
case consts.TIDTypeRecv:
res[tmp.RecvTID] = tmp
default:
return
}
}
return
}
// AddCacheTicketSend .
func (d *Dao) AddCacheTicketSend(c context.Context, tsMap map[int64]*model.TicketSend, TIDType string) (err error) {
var cacheKey string
switch TIDType {
case consts.TIDTypeSend:
cacheKey = model.CacheKeyTicketSend
case consts.TIDTypeRecv:
cacheKey = model.CacheKeyTicketRecv
default:
return
}
conn := d.redis.Get(c)
defer func() {
conn.Flush()
conn.Close()
}()
var args []interface{}
for ID, item := range tsMap {
var b []byte
if b, err = json.Marshal(item); err != nil {
log.Error("d.AddCacheTicketSend(%v, %s), json.Marshal(%s) error(%v)", tsMap, TIDType, b, err)
continue
}
args = append(args, fmt.Sprintf(cacheKey, ID), b)
}
if err = conn.Send("MSET", args...); err != nil {
log.Error("d.AddCacheTicketsByID(%+v) conn.Send() error(%v)", tsMap, err)
return
}
for ID := range tsMap {
conn.Send("EXPIRE", fmt.Sprintf(cacheKey, ID), model.RedisExpireOneDayTmp)
}
return
}