converter.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. package gateway
  2. import (
  3. "bytes"
  4. "errors"
  5. "io/ioutil"
  6. "mime/multipart"
  7. "net/http"
  8. "net/url"
  9. "strconv"
  10. "github.com/smallnest/rpcx/protocol"
  11. )
  12. const (
  13. XVersion = "X-RPCX-Version"
  14. XMessageType = "X-RPCX-MesssageType"
  15. XHeartbeat = "X-RPCX-Heartbeat"
  16. XOneway = "X-RPCX-Oneway"
  17. XMessageStatusType = "X-RPCX-MessageStatusType"
  18. XSerializeType = "X-RPCX-SerializeType"
  19. XMessageID = "X-RPCX-MessageID"
  20. XServicePath = "X-RPCX-ServicePath"
  21. XServiceMethod = "X-RPCX-ServiceMethod"
  22. XMeta = "X-RPCX-Meta"
  23. XErrorMessage = "X-RPCX-ErrorMessage"
  24. )
  25. func HttpRequest2RpcxRequest(r *http.Request) (*protocol.Message, error) {
  26. req := protocol.NewMessage()
  27. req.SetMessageType(protocol.Request)
  28. h := r.Header
  29. seq := getRpcxHeader(r, XMessageID)
  30. if seq != "" {
  31. id, err := strconv.ParseUint(seq, 10, 64)
  32. if err != nil {
  33. return nil, err
  34. }
  35. req.SetSeq(id)
  36. }
  37. heartbeat := getRpcxHeader(r, XHeartbeat)
  38. if heartbeat != "" {
  39. req.SetHeartbeat(true)
  40. }
  41. oneway := getRpcxHeader(r, XOneway)
  42. if oneway != "" {
  43. req.SetOneway(true)
  44. }
  45. if h.Get("Content-Encoding") == "gzip" {
  46. req.SetCompressType(protocol.Gzip)
  47. }
  48. st := getRpcxHeader(r, XSerializeType)
  49. if st != "" {
  50. rst, err := strconv.Atoi(st)
  51. if err != nil {
  52. return nil, err
  53. }
  54. req.SetSerializeType(protocol.SerializeType(rst))
  55. } else {
  56. return nil, errors.New("empty serialized type")
  57. }
  58. meta := getRpcxHeader(r, XMeta)
  59. if meta != "" {
  60. metadata, err := url.ParseQuery(meta)
  61. if err != nil {
  62. return nil, err
  63. }
  64. mm := make(map[string]string)
  65. for k, v := range metadata {
  66. if len(v) > 0 {
  67. mm[k] = v[0]
  68. }
  69. }
  70. req.Metadata = mm
  71. }
  72. req.Metadata = getUrlParams(r, req.Metadata)
  73. sp := getRpcxHeader(r, XServicePath)
  74. if sp != "" {
  75. req.ServicePath = sp
  76. } else {
  77. return nil, errors.New("empty servicepath")
  78. }
  79. sm := getRpcxHeader(r, XServiceMethod)
  80. if sm != "" {
  81. req.ServiceMethod = sm
  82. } else {
  83. return nil, errors.New("empty servicemethod")
  84. }
  85. payload, err := ioutil.ReadAll(r.Body)
  86. if err != nil {
  87. return nil, err
  88. }
  89. req.Payload = payload
  90. // Request.Body的读取, 读取数据时, 指针会对应移动至EOF, 所以下次读取的时候, seek指针还在EOF处
  91. // 后续无法获取请求数据,在此读完又存
  92. r.Body = ioutil.NopCloser(bytes.NewBuffer(payload))
  93. return req, nil
  94. }
  95. func MultipartRequest2RpcxRequest(r *http.Request) (map[string]string, *multipart.FileHeader, error) {
  96. r.ParseMultipartForm(10 << 20) //10mb
  97. form := r.MultipartForm
  98. formValues := make(map[string]string)
  99. //获取 multi-part/form header的 value
  100. h := r.Header
  101. meta := h.Get(XMeta)
  102. if meta != "" {
  103. metadata, err := url.ParseQuery(meta)
  104. if err != nil {
  105. return nil, nil, err
  106. }
  107. for k, v := range metadata {
  108. if len(v) > 0 {
  109. formValues[k] = v[0]
  110. }
  111. }
  112. }
  113. //获取 multi-part/form body中的form value
  114. for k, v := range form.Value {
  115. formValues[k] = v[0]
  116. }
  117. file := form.File["file"][0]
  118. formValues["fileName"] = file.Filename
  119. formValues["fileSize"] = strconv.FormatInt(file.Size, 10)
  120. sp := h.Get(XServicePath)
  121. if sp != "" {
  122. formValues["reqService"] = sp
  123. } else {
  124. return nil, nil, errors.New("empty servicepath")
  125. }
  126. sm := h.Get(XServiceMethod)
  127. if sm != "" {
  128. formValues["reqMethod"] = sm
  129. } else {
  130. return nil, nil, errors.New("empty servicemethod")
  131. }
  132. formValues["authExclude"] = "false"
  133. return formValues, form.File["file"][0], nil
  134. }
  135. func getRpcxHeader(r *http.Request, key string) string {
  136. val := r.Header.Get(key)
  137. if val != "" {
  138. return val
  139. } else {
  140. if values, ok := r.URL.Query()[key]; ok && len(values) > 0 {
  141. return values[0]
  142. } else {
  143. return ""
  144. }
  145. }
  146. }
  147. func getUrlParams(r *http.Request, metadata map[string]string) map[string]string {
  148. if len(metadata) == 0 {
  149. metadata = make(map[string]string)
  150. }
  151. query := r.URL.Query()
  152. for k, v := range query {
  153. if k != XMeta {
  154. metadata[k] = v[0]
  155. }
  156. }
  157. return metadata
  158. }
  159. // func RpcxResponse2HttpResponse(res *protocol.Message) (url.Values, []byte, error) {
  160. // m := make(url.Values)
  161. // m.Set(XVersion, strconv.Itoa(int(res.Version())))
  162. // if res.IsHeartbeat() {
  163. // m.Set(XHeartbeat, "true")
  164. // }
  165. // if res.IsOneway() {
  166. // m.Set(XOneway, "true")
  167. // }
  168. // if res.MessageStatusType() == protocol.Error {
  169. // m.Set(XMessageStatusType, "Error")
  170. // } else {
  171. // m.Set(XMessageStatusType, "Normal")
  172. // }
  173. // if res.CompressType() == protocol.Gzip {
  174. // m.Set("Content-Encoding", "gzip")
  175. // }
  176. // m.Set(XSerializeType, strconv.Itoa(int(res.SerializeType())))
  177. // m.Set(XMessageID, strconv.FormatUint(res.Seq(), 10))
  178. // m.Set(XServicePath, res.ServicePath)
  179. // m.Set(XServiceMethod, res.ServiceMethod)
  180. // return m, res.Payload, nil
  181. // }