converter.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package gateway
  2. import (
  3. "bytes"
  4. "errors"
  5. "io/ioutil"
  6. "mime/multipart"
  7. "net/http"
  8. "net/url"
  9. "strconv"
  10. "github.com/smallnest/rpcx/protocol"
  11. )
  12. const (
  13. XVersion = "X-RPCX-Version"
  14. XMessageType = "X-RPCX-MesssageType"
  15. XHeartbeat = "X-RPCX-Heartbeat"
  16. XOneway = "X-RPCX-Oneway"
  17. XMessageStatusType = "X-RPCX-MessageStatusType"
  18. XSerializeType = "X-RPCX-SerializeType"
  19. XMessageID = "X-RPCX-MessageID"
  20. XServicePath = "X-RPCX-ServicePath"
  21. XServiceMethod = "X-RPCX-ServiceMethod"
  22. XMeta = "X-RPCX-Meta"
  23. XErrorMessage = "X-RPCX-ErrorMessage"
  24. )
  25. func HttpRequest2RpcxRequest(r *http.Request) (*protocol.Message, error) {
  26. req := protocol.NewMessage()
  27. req.SetMessageType(protocol.Request)
  28. h := r.Header
  29. seq := h.Get(XMessageID)
  30. if seq != "" {
  31. id, err := strconv.ParseUint(seq, 10, 64)
  32. if err != nil {
  33. return nil, err
  34. }
  35. req.SetSeq(id)
  36. }
  37. heartbeat := h.Get(XHeartbeat)
  38. if heartbeat != "" {
  39. req.SetHeartbeat(true)
  40. }
  41. oneway := h.Get(XOneway)
  42. if oneway != "" {
  43. req.SetOneway(true)
  44. }
  45. if h.Get("Content-Encoding") == "gzip" {
  46. req.SetCompressType(protocol.Gzip)
  47. }
  48. st := h.Get(XSerializeType)
  49. if st != "" {
  50. rst, err := strconv.Atoi(st)
  51. if err != nil {
  52. return nil, err
  53. }
  54. req.SetSerializeType(protocol.SerializeType(rst))
  55. } else {
  56. return nil, errors.New("empty serialized type")
  57. }
  58. meta := h.Get(XMeta)
  59. if meta != "" {
  60. metadata, err := url.ParseQuery(meta)
  61. if err != nil {
  62. return nil, err
  63. }
  64. mm := make(map[string]string)
  65. for k, v := range metadata {
  66. if len(v) > 0 {
  67. mm[k] = v[0]
  68. }
  69. }
  70. req.Metadata = mm
  71. }
  72. sp := h.Get(XServicePath)
  73. if sp != "" {
  74. req.ServicePath = sp
  75. } else {
  76. return nil, errors.New("empty servicepath")
  77. }
  78. sm := h.Get(XServiceMethod)
  79. if sm != "" {
  80. req.ServiceMethod = sm
  81. } else {
  82. return nil, errors.New("empty servicemethod")
  83. }
  84. payload, err := ioutil.ReadAll(r.Body)
  85. if err != nil {
  86. return nil, err
  87. }
  88. req.Payload = payload
  89. // Request.Body的读取, 读取数据时, 指针会对应移动至EOF, 所以下次读取的时候, seek指针还在EOF处
  90. // 后续无法获取请求数据,在此读完又存
  91. r.Body = ioutil.NopCloser(bytes.NewBuffer(payload))
  92. return req, nil
  93. }
  94. func MultipartRequest2RpcxRequest(r *http.Request) (map[string]string, *multipart.FileHeader, error) {
  95. r.ParseMultipartForm(10 << 20) //10mb
  96. form := r.MultipartForm
  97. formValues := make(map[string]string)
  98. //获取 multi-part/form header的 value
  99. h := r.Header
  100. meta := h.Get(XMeta)
  101. if meta != "" {
  102. metadata, err := url.ParseQuery(meta)
  103. if err != nil {
  104. return nil, nil, err
  105. }
  106. for k, v := range metadata {
  107. if len(v) > 0 {
  108. formValues[k] = v[0]
  109. }
  110. }
  111. }
  112. //获取 multi-part/form body中的form value
  113. for k, v := range form.Value {
  114. formValues[k] = v[0]
  115. }
  116. file := form.File["file"][0]
  117. formValues["fileName"] = file.Filename
  118. formValues["fileSize"] = strconv.FormatInt(file.Size, 10)
  119. sp := h.Get(XServicePath)
  120. if sp != "" {
  121. formValues["reqService"] = sp
  122. } else {
  123. return nil, nil, errors.New("empty servicepath")
  124. }
  125. sm := h.Get(XServiceMethod)
  126. if sm != "" {
  127. formValues["reqMethod"] = sm
  128. } else {
  129. return nil, nil, errors.New("empty servicemethod")
  130. }
  131. formValues["authExclude"] = "false"
  132. return formValues, form.File["file"][0], nil
  133. }
  134. // func RpcxResponse2HttpResponse(res *protocol.Message) (url.Values, []byte, error) {
  135. // m := make(url.Values)
  136. // m.Set(XVersion, strconv.Itoa(int(res.Version())))
  137. // if res.IsHeartbeat() {
  138. // m.Set(XHeartbeat, "true")
  139. // }
  140. // if res.IsOneway() {
  141. // m.Set(XOneway, "true")
  142. // }
  143. // if res.MessageStatusType() == protocol.Error {
  144. // m.Set(XMessageStatusType, "Error")
  145. // } else {
  146. // m.Set(XMessageStatusType, "Normal")
  147. // }
  148. // if res.CompressType() == protocol.Gzip {
  149. // m.Set("Content-Encoding", "gzip")
  150. // }
  151. // m.Set(XSerializeType, strconv.Itoa(int(res.SerializeType())))
  152. // m.Set(XMessageID, strconv.FormatUint(res.Seq(), 10))
  153. // m.Set(XServicePath, res.ServicePath)
  154. // m.Set(XServiceMethod, res.ServiceMethod)
  155. // return m, res.Payload, nil
  156. // }