converter.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. package gateway
  2. import (
  3. "bytes"
  4. "errors"
  5. "io/ioutil"
  6. "mime/multipart"
  7. "net/http"
  8. "net/url"
  9. "strconv"
  10. "strings"
  11. "github.com/smallnest/rpcx/protocol"
  12. )
  13. const (
  14. XVersion = "X-RPCX-Version"
  15. XMessageType = "X-RPCX-MesssageType"
  16. XHeartbeat = "X-RPCX-Heartbeat"
  17. XOneway = "X-RPCX-Oneway"
  18. XMessageStatusType = "X-RPCX-MessageStatusType"
  19. XSerializeType = "X-RPCX-SerializeType"
  20. XMessageID = "X-RPCX-MessageID"
  21. XServicePath = "X-RPCX-ServicePath"
  22. XServiceMethod = "X-RPCX-ServiceMethod"
  23. XMeta = "X-RPCX-Meta"
  24. XErrorMessage = "X-RPCX-ErrorMessage"
  25. )
  26. func HttpRequest2RpcxRequest(r *http.Request) (*protocol.Message, error) {
  27. req := protocol.NewMessage()
  28. req.SetMessageType(protocol.Request)
  29. h := r.Header
  30. seq := getRpcxHeader(r, XMessageID)
  31. if seq != "" {
  32. id, err := strconv.ParseUint(seq, 10, 64)
  33. if err != nil {
  34. return nil, err
  35. }
  36. req.SetSeq(id)
  37. }
  38. heartbeat := getRpcxHeader(r, XHeartbeat)
  39. if heartbeat != "" {
  40. req.SetHeartbeat(true)
  41. }
  42. oneway := getRpcxHeader(r, XOneway)
  43. if oneway != "" {
  44. req.SetOneway(true)
  45. }
  46. if h.Get("Content-Encoding") == "gzip" {
  47. req.SetCompressType(protocol.Gzip)
  48. }
  49. st := getRpcxHeader(r, XSerializeType)
  50. if st != "" {
  51. rst, err := strconv.Atoi(st)
  52. if err != nil {
  53. return nil, err
  54. }
  55. req.SetSerializeType(protocol.SerializeType(rst))
  56. } else {
  57. return nil, errors.New("empty serialized type")
  58. }
  59. meta := getRpcxHeader(r, XMeta)
  60. if meta != "" {
  61. metadata, err := url.ParseQuery(meta)
  62. if err != nil {
  63. return nil, err
  64. }
  65. mm := make(map[string]string)
  66. for k, v := range metadata {
  67. if len(v) > 0 {
  68. mm[k] = v[0]
  69. }
  70. }
  71. req.Metadata = mm
  72. }
  73. req.Metadata = getUrlParams(r, req.Metadata)
  74. sp := getRpcxHeader(r, XServicePath)
  75. if sp != "" {
  76. req.ServicePath = sp
  77. } else {
  78. return nil, errors.New("empty servicepath")
  79. }
  80. sm := getRpcxHeader(r, XServiceMethod)
  81. if sm != "" {
  82. req.ServiceMethod = sm
  83. } else {
  84. return nil, errors.New("empty servicemethod")
  85. }
  86. payload, err := ioutil.ReadAll(r.Body)
  87. if err != nil {
  88. return nil, err
  89. }
  90. req.Payload = payload
  91. // Request.Body的读取, 读取数据时, 指针会对应移动至EOF, 所以下次读取的时候, seek指针还在EOF处
  92. // 后续无法获取请求数据,在此读完又存
  93. r.Body = ioutil.NopCloser(bytes.NewBuffer(payload))
  94. return req, nil
  95. }
  96. func MultipartRequest2RpcxRequest(r *http.Request) (map[string]string, map[string][]*multipart.FileHeader, error) {
  97. r.ParseMultipartForm(10 << 20) //10mb
  98. form := r.MultipartForm
  99. formValues := make(map[string]string)
  100. //获取 multi-part/form header的 value
  101. h := r.Header
  102. meta := h.Get(XMeta)
  103. if meta != "" {
  104. metadata, err := url.ParseQuery(meta)
  105. if err != nil {
  106. return nil, nil, err
  107. }
  108. for k, v := range metadata {
  109. if len(v) > 0 {
  110. formValues[k] = v[0]
  111. }
  112. }
  113. }
  114. //获取 multi-part/form body中的form value
  115. for k, v := range form.Value {
  116. formValues[k] = v[0]
  117. }
  118. sp := h.Get(XServicePath)
  119. if sp != "" {
  120. formValues["reqService"] = sp
  121. } else {
  122. return nil, nil, errors.New("empty servicepath")
  123. }
  124. sm := h.Get(XServiceMethod)
  125. if sm != "" {
  126. formValues["reqMethod"] = sm
  127. } else {
  128. return nil, nil, errors.New("empty servicemethod")
  129. }
  130. formValues["authExclude"] = "false"
  131. formValues["fileNum"] = strconv.Itoa(len(form.File))
  132. return formValues, form.File, nil
  133. }
  134. func getRpcxHeader(r *http.Request, key string) string {
  135. val := r.Header.Get(key)
  136. if val != "" {
  137. return val
  138. } else {
  139. if values, ok := r.URL.Query()[key]; ok && len(values) > 0 {
  140. return values[0]
  141. } else {
  142. return ""
  143. }
  144. }
  145. }
  146. func getUrlParams(r *http.Request, metadata map[string]string) map[string]string {
  147. if len(metadata) == 0 {
  148. metadata = make(map[string]string)
  149. }
  150. query := r.URL.Query()
  151. for k, v := range query {
  152. if !strings.HasPrefix(k, "X-RPCX-") && len(v) > 0 {
  153. metadata[k] = v[0]
  154. }
  155. }
  156. return metadata
  157. }
  158. // func RpcxResponse2HttpResponse(res *protocol.Message) (url.Values, []byte, error) {
  159. // m := make(url.Values)
  160. // m.Set(XVersion, strconv.Itoa(int(res.Version())))
  161. // if res.IsHeartbeat() {
  162. // m.Set(XHeartbeat, "true")
  163. // }
  164. // if res.IsOneway() {
  165. // m.Set(XOneway, "true")
  166. // }
  167. // if res.MessageStatusType() == protocol.Error {
  168. // m.Set(XMessageStatusType, "Error")
  169. // } else {
  170. // m.Set(XMessageStatusType, "Normal")
  171. // }
  172. // if res.CompressType() == protocol.Gzip {
  173. // m.Set("Content-Encoding", "gzip")
  174. // }
  175. // m.Set(XSerializeType, strconv.Itoa(int(res.SerializeType())))
  176. // m.Set(XMessageID, strconv.FormatUint(res.Seq(), 10))
  177. // m.Set(XServicePath, res.ServicePath)
  178. // m.Set(XServiceMethod, res.ServiceMethod)
  179. // return m, res.Payload, nil
  180. // }