converter.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. package gateway
  2. import (
  3. "bytes"
  4. "errors"
  5. "io/ioutil"
  6. "mime/multipart"
  7. "net/http"
  8. "net/url"
  9. "strconv"
  10. "strings"
  11. "github.com/smallnest/rpcx/protocol"
  12. )
  13. const (
  14. XVersion = "X-RPCX-Version"
  15. XMessageType = "X-RPCX-MesssageType"
  16. XHeartbeat = "X-RPCX-Heartbeat"
  17. XOneway = "X-RPCX-Oneway"
  18. XMessageStatusType = "X-RPCX-MessageStatusType"
  19. XSerializeType = "X-RPCX-SerializeType"
  20. XMessageID = "X-RPCX-MessageID"
  21. XServicePath = "X-RPCX-ServicePath"
  22. XServiceMethod = "X-RPCX-ServiceMethod"
  23. XMeta = "X-RPCX-Meta"
  24. XErrorMessage = "X-RPCX-ErrorMessage"
  25. )
  26. func HttpRequest2RpcxRequest(r *http.Request) (*protocol.Message, error) {
  27. req := protocol.NewMessage()
  28. req.SetMessageType(protocol.Request)
  29. h := r.Header
  30. seq := getRpcxHeader(r, XMessageID)
  31. if seq != "" {
  32. id, err := strconv.ParseUint(seq, 10, 64)
  33. if err != nil {
  34. return nil, err
  35. }
  36. req.SetSeq(id)
  37. }
  38. heartbeat := getRpcxHeader(r, XHeartbeat)
  39. if heartbeat != "" {
  40. req.SetHeartbeat(true)
  41. }
  42. oneway := getRpcxHeader(r, XOneway)
  43. if oneway != "" {
  44. req.SetOneway(true)
  45. }
  46. if h.Get("Content-Encoding") == "gzip" {
  47. req.SetCompressType(protocol.Gzip)
  48. }
  49. st := getRpcxHeader(r, XSerializeType)
  50. if st != "" {
  51. rst, err := strconv.Atoi(st)
  52. if err != nil {
  53. return nil, err
  54. }
  55. req.SetSerializeType(protocol.SerializeType(rst))
  56. } else {
  57. return nil, errors.New("empty serialized type")
  58. }
  59. meta := getRpcxHeader(r, XMeta)
  60. if meta != "" {
  61. metadata, err := url.ParseQuery(meta)
  62. if err != nil {
  63. return nil, err
  64. }
  65. mm := make(map[string]string)
  66. for k, v := range metadata {
  67. if len(v) > 0 {
  68. mm[k] = v[0]
  69. }
  70. }
  71. req.Metadata = mm
  72. }
  73. req.Metadata = getUrlParams(r, req.Metadata)
  74. sp := getRpcxHeader(r, XServicePath)
  75. if sp != "" {
  76. req.ServicePath = sp
  77. } else {
  78. return nil, errors.New("empty servicepath")
  79. }
  80. sm := getRpcxHeader(r, XServiceMethod)
  81. if sm != "" {
  82. req.ServiceMethod = sm
  83. } else {
  84. return nil, errors.New("empty servicemethod")
  85. }
  86. payload, err := ioutil.ReadAll(r.Body)
  87. if err != nil {
  88. return nil, err
  89. }
  90. req.Payload = payload
  91. // Request.Body的读取, 读取数据时, 指针会对应移动至EOF, 所以下次读取的时候, seek指针还在EOF处
  92. // 后续无法获取请求数据,在此读完又存
  93. r.Body = ioutil.NopCloser(bytes.NewBuffer(payload))
  94. return req, nil
  95. }
  96. func MultipartRequest2RpcxRequest(r *http.Request) (map[string]string, *multipart.FileHeader, error) {
  97. r.ParseMultipartForm(10 << 20) //10mb
  98. form := r.MultipartForm
  99. formValues := make(map[string]string)
  100. //获取 multi-part/form header的 value
  101. h := r.Header
  102. meta := h.Get(XMeta)
  103. if meta != "" {
  104. metadata, err := url.ParseQuery(meta)
  105. if err != nil {
  106. return nil, nil, err
  107. }
  108. for k, v := range metadata {
  109. if len(v) > 0 {
  110. formValues[k] = v[0]
  111. }
  112. }
  113. }
  114. //获取 multi-part/form body中的form value
  115. for k, v := range form.Value {
  116. formValues[k] = v[0]
  117. }
  118. file := form.File["file"][0]
  119. formValues["fileName"] = file.Filename
  120. formValues["fileSize"] = strconv.FormatInt(file.Size, 10)
  121. sp := h.Get(XServicePath)
  122. if sp != "" {
  123. formValues["reqService"] = sp
  124. } else {
  125. return nil, nil, errors.New("empty servicepath")
  126. }
  127. sm := h.Get(XServiceMethod)
  128. if sm != "" {
  129. formValues["reqMethod"] = sm
  130. } else {
  131. return nil, nil, errors.New("empty servicemethod")
  132. }
  133. formValues["authExclude"] = "false"
  134. return formValues, form.File["file"][0], nil
  135. }
  136. func getRpcxHeader(r *http.Request, key string) string {
  137. val := r.Header.Get(key)
  138. if val != "" {
  139. return val
  140. } else {
  141. if values, ok := r.URL.Query()[key]; ok && len(values) > 0 {
  142. return values[0]
  143. } else {
  144. return ""
  145. }
  146. }
  147. }
  148. func getUrlParams(r *http.Request, metadata map[string]string) map[string]string {
  149. if len(metadata) == 0 {
  150. metadata = make(map[string]string)
  151. }
  152. query := r.URL.Query()
  153. for k, v := range query {
  154. if !strings.HasPrefix(k, "X-RPCX-") && len(v) > 0 {
  155. metadata[k] = v[0]
  156. }
  157. }
  158. return metadata
  159. }
  160. // func RpcxResponse2HttpResponse(res *protocol.Message) (url.Values, []byte, error) {
  161. // m := make(url.Values)
  162. // m.Set(XVersion, strconv.Itoa(int(res.Version())))
  163. // if res.IsHeartbeat() {
  164. // m.Set(XHeartbeat, "true")
  165. // }
  166. // if res.IsOneway() {
  167. // m.Set(XOneway, "true")
  168. // }
  169. // if res.MessageStatusType() == protocol.Error {
  170. // m.Set(XMessageStatusType, "Error")
  171. // } else {
  172. // m.Set(XMessageStatusType, "Normal")
  173. // }
  174. // if res.CompressType() == protocol.Gzip {
  175. // m.Set("Content-Encoding", "gzip")
  176. // }
  177. // m.Set(XSerializeType, strconv.Itoa(int(res.SerializeType())))
  178. // m.Set(XMessageID, strconv.FormatUint(res.Seq(), 10))
  179. // m.Set(XServicePath, res.ServicePath)
  180. // m.Set(XServiceMethod, res.ServiceMethod)
  181. // return m, res.Payload, nil
  182. // }