server.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. package gin
  2. import (
  3. "fmt"
  4. "github.com/gogf/gf/encoding/gjson"
  5. "github.com/gogf/gf/frame/g"
  6. "net/http"
  7. "net/url"
  8. "strings"
  9. "github.com/gin-contrib/cors"
  10. "github.com/gin-gonic/gin"
  11. "github.com/gogf/gf/encoding/gbase64"
  12. "github.com/gogf/gf/util/gconv"
  13. . "github.com/rpcxio/rpcx-gateway"
  14. )
  15. const (
  16. DS_DT_Excel = "DS-DT-Excel" // 数据类型:Excel数据流
  17. )
  18. type Server struct {
  19. addr string
  20. g *gin.Engine
  21. }
  22. type ExcelDataResp struct {
  23. Filename string `json:"filename,omitempty"`
  24. Data []byte
  25. }
  26. // New returns a server.
  27. func New(addr string) *Server {
  28. return &Server{
  29. addr: addr,
  30. }
  31. }
  32. // NewWithGin returns a server with preconfigured gin.
  33. func NewWithGin(addr string, g *gin.Engine) *Server {
  34. return &Server{
  35. addr: addr,
  36. g: g,
  37. }
  38. }
  39. // RegisterHandler configures the handler to handle http rpcx invoke.
  40. // It wraps ServiceHandler into httprouter.Handle.
  41. func (s *Server) RegisterHandler(base string, handler ServiceHandler) {
  42. g := s.g
  43. if g == nil {
  44. gin.SetMode(gin.ReleaseMode) // 设备为生产模式 add by sunmiao
  45. g = gin.Default()
  46. // 自定义日志输出
  47. g.Use(gin.LoggerWithFormatter(func(param gin.LogFormatterParams) string {
  48. return fmt.Sprintf("%s [%s] %s%s %s\n",
  49. param.ClientIP, // 客户端IP
  50. param.Request.Header.Get("Tenant"), // 租户码
  51. param.Path, // 请求路径
  52. "/"+param.Request.Header.Get("X-RPCX-ServicePath")+"/"+param.Request.Header.Get("X-RPCX-ServiceMethod"),
  53. //param.StatusCode, // 请求状态码
  54. //param.Latency, // 请求时长
  55. param.ErrorMessage,
  56. )
  57. }))
  58. // 添加CORS处理 add by sunmiao
  59. config := cors.DefaultConfig()
  60. config.AllowAllOrigins = true
  61. config.AllowHeaders = []string{"Origin", "Content-Length", "Content-Type",
  62. "Authorization", "Tenant", "X-RPCX-SerializeType", "X-RPCX-ServiceMethod", "SrvEnv",
  63. "X-RPCX-ServicePath", "X-RPCX-Meta"}
  64. g.Use(cors.New(config))
  65. }
  66. h := wrapServiceHandler(handler)
  67. g.POST(base, h)
  68. // 只开放POST edit by sunmiao
  69. //g.GET(base, h)
  70. //g.PUT(base, h)
  71. s.g = g
  72. }
  73. func wrapServiceHandler(handler ServiceHandler) gin.HandlerFunc {
  74. return func(ctx *gin.Context) {
  75. r := ctx.Request
  76. w := ctx.Writer
  77. if r.Header.Get(XServicePath) == "" {
  78. servicePath := ctx.Param("servicePath")
  79. if strings.HasPrefix(servicePath, "/") {
  80. servicePath = servicePath[1:]
  81. }
  82. r.Header.Set(XServicePath, servicePath)
  83. }
  84. servicePath := r.Header.Get(XServicePath)
  85. messageID := r.Header.Get(XMessageID)
  86. wh := w.Header()
  87. if messageID != "" {
  88. wh.Set(XMessageID, messageID)
  89. }
  90. xmeta := ""
  91. // 传递租户码
  92. tenant := r.Header.Get("Tenant")
  93. if tenant != "" {
  94. //r.Header.Set(XMeta, "tenant="+tenant)
  95. xmeta = "tenant=" + tenant
  96. }
  97. serviceMethod := r.Header.Get(XServiceMethod)
  98. if serviceMethod != "" {
  99. if xmeta != "" {
  100. xmeta = xmeta + "&"
  101. }
  102. xmeta = xmeta + "reqMethod=" + servicePath + "." + serviceMethod
  103. }
  104. // 传递ClientIP和UserAgent
  105. x_Meta := r.Header.Get("X-RPCX-Meta")
  106. if x_Meta != "" && strings.Contains(x_Meta, "need_clint_Info=1") {
  107. clientIP := ctx.ClientIP()
  108. userAgent := gbase64.EncodeString(r.UserAgent())
  109. if xmeta != "" {
  110. xmeta = xmeta + "&"
  111. }
  112. xmeta = xmeta + "clientIP=" + clientIP + "&userAgent=" + userAgent
  113. }
  114. // 追加原始 X-RPCX-Meta
  115. if xmeta != "" {
  116. if x_Meta != "" {
  117. xmeta = x_Meta + "&" + xmeta
  118. }
  119. } else {
  120. xmeta = x_Meta
  121. }
  122. r.Header.Set(XMeta, xmeta)
  123. meta, payload, err := handler(r, servicePath)
  124. for k, v := range meta {
  125. wh.Set(k, v)
  126. }
  127. // todo 添加日志收集
  128. reqData := g.Map{}
  129. ctx.ShouldBindJSON(&reqData)
  130. logInfo := g.Map{"reqHeader": r.Header, "reqData": reqData, "resMeta": meta, "resData": gjson.New(payload)}
  131. if err != nil {
  132. logInfo["err"] = err
  133. g.Log().Error(logInfo)
  134. } else {
  135. g.Log().Info(logInfo)
  136. }
  137. if err == nil {
  138. // Excel文件流下载
  139. if v, ok := meta[XMeta]; ok {
  140. metadata, err := url.ParseQuery(v)
  141. if err != nil {
  142. resp := errorJson(500, err.Error())
  143. ctx.JSON(http.StatusOK, resp)
  144. return
  145. }
  146. if _, ok := metadata[DS_DT_Excel]; ok {
  147. excelData := ExcelDataResp{
  148. Filename: "temp.xlsx",
  149. }
  150. err := gconv.Struct(payload, &excelData)
  151. if err != nil {
  152. resp := errorJson(500, err.Error())
  153. ctx.JSON(http.StatusOK, resp)
  154. return
  155. }
  156. wh.Set("content-disposition", "attachment; filename="+excelData.Filename)
  157. wh.Set("Access-Control-Expose-Headers", "Content-Disposition")
  158. ctx.Data(http.StatusOK, "application/vnd.ms-excel", excelData.Data)
  159. return
  160. }
  161. }
  162. // 普通开发环境改为json格式
  163. env := r.Header.Get("SrvEnv")
  164. if env == "dev" {
  165. ctx.Data(http.StatusOK, "application/json; charset=UTF-8", payload)
  166. } else {
  167. // 常规数据流
  168. ctx.Data(http.StatusOK, "application/octet-stream; charset=UTF-8", payload)
  169. }
  170. return
  171. }
  172. rh := r.Header
  173. for k, v := range rh {
  174. if strings.HasPrefix(k, "X-RPCX-") && len(v) > 0 {
  175. wh.Set(k, v[0])
  176. }
  177. }
  178. //wh.Set(XMessageStatusType, "Error")
  179. //wh.Set(XErrorMessage, err.Error())
  180. //ctx.String(http.StatusOK, err.Error())
  181. resp := errorJson(500, err.Error())
  182. if err.Error() == "InvalidToken" {
  183. resp.Code = 401
  184. }
  185. ctx.JSON(http.StatusOK, resp)
  186. }
  187. }
  188. func (s *Server) Serve() error {
  189. return s.g.Run(s.addr)
  190. }
  191. // 数据返回通用JSON数据结构
  192. type JsonResponse struct {
  193. Code int `json:"code,omitempty"` // 错误码((200:成功, 其他是异常)
  194. Msg string `json:"msg,omitempty"` // 提示信息
  195. Data interface{} `json:"data,omitempty"` // 返回数据(业务接口定义具体数据结构)
  196. }
  197. func errorJson(code int, message string, data ...interface{}) JsonResponse {
  198. responseData := interface{}(nil)
  199. if len(data) > 0 {
  200. responseData = data[0]
  201. }
  202. jsonData := JsonResponse{
  203. Code: code,
  204. Msg: message,
  205. Data: responseData,
  206. }
  207. return jsonData
  208. }