converter.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package gateway
  2. import (
  3. "errors"
  4. "io/ioutil"
  5. "mime/multipart"
  6. "net/http"
  7. "net/url"
  8. "strconv"
  9. "github.com/smallnest/rpcx/protocol"
  10. )
  11. const (
  12. XVersion = "X-RPCX-Version"
  13. XMessageType = "X-RPCX-MesssageType"
  14. XHeartbeat = "X-RPCX-Heartbeat"
  15. XOneway = "X-RPCX-Oneway"
  16. XMessageStatusType = "X-RPCX-MessageStatusType"
  17. XSerializeType = "X-RPCX-SerializeType"
  18. XMessageID = "X-RPCX-MessageID"
  19. XServicePath = "X-RPCX-ServicePath"
  20. XServiceMethod = "X-RPCX-ServiceMethod"
  21. XMeta = "X-RPCX-Meta"
  22. XErrorMessage = "X-RPCX-ErrorMessage"
  23. )
  24. func HttpRequest2RpcxRequest(r *http.Request) (*protocol.Message, error) {
  25. req := protocol.NewMessage()
  26. req.SetMessageType(protocol.Request)
  27. h := r.Header
  28. seq := h.Get(XMessageID)
  29. if seq != "" {
  30. id, err := strconv.ParseUint(seq, 10, 64)
  31. if err != nil {
  32. return nil, err
  33. }
  34. req.SetSeq(id)
  35. }
  36. heartbeat := h.Get(XHeartbeat)
  37. if heartbeat != "" {
  38. req.SetHeartbeat(true)
  39. }
  40. oneway := h.Get(XOneway)
  41. if oneway != "" {
  42. req.SetOneway(true)
  43. }
  44. if h.Get("Content-Encoding") == "gzip" {
  45. req.SetCompressType(protocol.Gzip)
  46. }
  47. st := h.Get(XSerializeType)
  48. if st != "" {
  49. rst, err := strconv.Atoi(st)
  50. if err != nil {
  51. return nil, err
  52. }
  53. req.SetSerializeType(protocol.SerializeType(rst))
  54. } else {
  55. return nil, errors.New("empty serialized type")
  56. }
  57. meta := h.Get(XMeta)
  58. if meta != "" {
  59. metadata, err := url.ParseQuery(meta)
  60. if err != nil {
  61. return nil, err
  62. }
  63. mm := make(map[string]string)
  64. for k, v := range metadata {
  65. if len(v) > 0 {
  66. mm[k] = v[0]
  67. }
  68. }
  69. req.Metadata = mm
  70. }
  71. sp := h.Get(XServicePath)
  72. if sp != "" {
  73. req.ServicePath = sp
  74. } else {
  75. return nil, errors.New("empty servicepath")
  76. }
  77. sm := h.Get(XServiceMethod)
  78. if sm != "" {
  79. req.ServiceMethod = sm
  80. } else {
  81. return nil, errors.New("empty servicemethod")
  82. }
  83. payload, err := ioutil.ReadAll(r.Body)
  84. if err != nil {
  85. return nil, err
  86. }
  87. req.Payload = payload
  88. return req, nil
  89. }
  90. func MultipartRequest2RpcxRequest(r *http.Request) (map[string]string, *multipart.FileHeader, error) {
  91. r.ParseMultipartForm(10 << 20) //10mb
  92. form := r.MultipartForm
  93. formValues := make(map[string]string)
  94. //获取 multi-part/form header的 value
  95. h := r.Header
  96. meta := h.Get(XMeta)
  97. if meta != "" {
  98. metadata, err := url.ParseQuery(meta)
  99. if err != nil {
  100. return nil, nil, err
  101. }
  102. for k, v := range metadata {
  103. if len(v) > 0 {
  104. formValues[k] = v[0]
  105. }
  106. }
  107. }
  108. //获取 multi-part/form body中的form value
  109. for k, v := range form.Value {
  110. formValues[k] = v[0]
  111. }
  112. file := form.File["file"][0]
  113. formValues["fileName"] = file.Filename
  114. formValues["fileSize"] = strconv.FormatInt(file.Size, 10)
  115. sp := h.Get(XServicePath)
  116. if sp != "" {
  117. formValues["reqService"] = sp
  118. } else {
  119. return nil, nil, errors.New("empty servicepath")
  120. }
  121. sm := h.Get(XServiceMethod)
  122. if sm != "" {
  123. formValues["reqMethod"] = sm
  124. } else {
  125. return nil, nil, errors.New("empty servicemethod")
  126. }
  127. formValues["authExclude"] = "false"
  128. return formValues, form.File["file"][0], nil
  129. }
  130. // func RpcxResponse2HttpResponse(res *protocol.Message) (url.Values, []byte, error) {
  131. // m := make(url.Values)
  132. // m.Set(XVersion, strconv.Itoa(int(res.Version())))
  133. // if res.IsHeartbeat() {
  134. // m.Set(XHeartbeat, "true")
  135. // }
  136. // if res.IsOneway() {
  137. // m.Set(XOneway, "true")
  138. // }
  139. // if res.MessageStatusType() == protocol.Error {
  140. // m.Set(XMessageStatusType, "Error")
  141. // } else {
  142. // m.Set(XMessageStatusType, "Normal")
  143. // }
  144. // if res.CompressType() == protocol.Gzip {
  145. // m.Set("Content-Encoding", "gzip")
  146. // }
  147. // m.Set(XSerializeType, strconv.Itoa(int(res.SerializeType())))
  148. // m.Set(XMessageID, strconv.FormatUint(res.Seq(), 10))
  149. // m.Set(XServicePath, res.ServicePath)
  150. // m.Set(XServiceMethod, res.ServiceMethod)
  151. // return m, res.Payload, nil
  152. // }