converter.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. package gateway
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "io/ioutil"
  6. "log"
  7. "net/http"
  8. "net/url"
  9. "os"
  10. "path"
  11. "strconv"
  12. "github.com/smallnest/rpcx/protocol"
  13. )
  14. const (
  15. XVersion = "X-RPCX-Version"
  16. XMessageType = "X-RPCX-MesssageType"
  17. XHeartbeat = "X-RPCX-Heartbeat"
  18. XOneway = "X-RPCX-Oneway"
  19. XMessageStatusType = "X-RPCX-MessageStatusType"
  20. XSerializeType = "X-RPCX-SerializeType"
  21. XMessageID = "X-RPCX-MessageID"
  22. XServicePath = "X-RPCX-ServicePath"
  23. XServiceMethod = "X-RPCX-ServiceMethod"
  24. XMeta = "X-RPCX-Meta"
  25. XErrorMessage = "X-RPCX-ErrorMessage"
  26. )
  27. func HttpRequest2RpcxRequest(r *http.Request) (*protocol.Message, error) {
  28. req := protocol.NewMessage()
  29. req.SetMessageType(protocol.Request)
  30. h := r.Header
  31. seq := h.Get(XMessageID)
  32. if seq != "" {
  33. id, err := strconv.ParseUint(seq, 10, 64)
  34. if err != nil {
  35. return nil, err
  36. }
  37. req.SetSeq(id)
  38. }
  39. heartbeat := h.Get(XHeartbeat)
  40. if heartbeat != "" {
  41. req.SetHeartbeat(true)
  42. }
  43. oneway := h.Get(XOneway)
  44. if oneway != "" {
  45. req.SetOneway(true)
  46. }
  47. if h.Get("Content-Encoding") == "gzip" {
  48. req.SetCompressType(protocol.Gzip)
  49. }
  50. st := h.Get(XSerializeType)
  51. if st != "" {
  52. rst, err := strconv.Atoi(st)
  53. if err != nil {
  54. return nil, err
  55. }
  56. req.SetSerializeType(protocol.SerializeType(rst))
  57. } else {
  58. return nil, errors.New("empty serialized type")
  59. }
  60. meta := h.Get(XMeta)
  61. if meta != "" {
  62. metadata, err := url.ParseQuery(meta)
  63. if err != nil {
  64. return nil, err
  65. }
  66. mm := make(map[string]string)
  67. for k, v := range metadata {
  68. if len(v) > 0 {
  69. mm[k] = v[0]
  70. }
  71. }
  72. req.Metadata = mm
  73. }
  74. sp := h.Get(XServicePath)
  75. if sp != "" {
  76. req.ServicePath = sp
  77. } else {
  78. return nil, errors.New("empty servicepath")
  79. }
  80. sm := h.Get(XServiceMethod)
  81. if sm != "" {
  82. req.ServiceMethod = sm
  83. } else {
  84. return nil, errors.New("empty servicemethod")
  85. }
  86. payload, err := ioutil.ReadAll(r.Body)
  87. if err != nil {
  88. return nil, err
  89. }
  90. req.Payload = payload
  91. return req, nil
  92. }
  93. func MultipartRequest2RpcxRequest(r *http.Request) (*protocol.Message, string, error) {
  94. req := protocol.NewMessage()
  95. req.SetMessageType(protocol.Request)
  96. h := r.Header
  97. seq := h.Get(XMessageID)
  98. if seq != "" {
  99. id, err := strconv.ParseUint(seq, 10, 64)
  100. if err != nil {
  101. return nil, "", err
  102. }
  103. req.SetSeq(id)
  104. }
  105. heartbeat := h.Get(XHeartbeat)
  106. if heartbeat != "" {
  107. req.SetHeartbeat(true)
  108. }
  109. oneway := h.Get(XOneway)
  110. if oneway != "" {
  111. req.SetOneway(true)
  112. }
  113. if h.Get("Content-Encoding") == "gzip" {
  114. req.SetCompressType(protocol.Gzip)
  115. }
  116. st := h.Get(XSerializeType)
  117. if st != "" {
  118. rst, err := strconv.Atoi(st)
  119. if err != nil {
  120. return nil, "", err
  121. }
  122. req.SetSerializeType(protocol.SerializeType(rst))
  123. } else {
  124. return nil, "", errors.New("empty serialized type")
  125. }
  126. meta := h.Get(XMeta)
  127. if meta != "" {
  128. metadata, err := url.ParseQuery(meta)
  129. if err != nil {
  130. return nil, "", err
  131. }
  132. mm := make(map[string]string)
  133. for k, v := range metadata {
  134. if len(v) > 0 {
  135. mm[k] = v[0]
  136. }
  137. }
  138. req.Metadata = mm
  139. }
  140. sp := h.Get(XServicePath)
  141. if sp != "" {
  142. req.ServicePath = sp
  143. } else {
  144. return nil, "", errors.New("empty servicepath")
  145. }
  146. sm := h.Get(XServiceMethod)
  147. if sm != "" {
  148. req.ServiceMethod = sm
  149. } else {
  150. return nil, "", errors.New("empty servicemethod")
  151. }
  152. multipartReader, readerErr := r.MultipartReader()
  153. if readerErr != nil {
  154. return nil, "", readerErr
  155. }
  156. form, parseErr := multipartReader.ReadForm(32 << 20)
  157. if parseErr != nil {
  158. return nil, "", parseErr
  159. }
  160. metaForm := make(map[string]string)
  161. for k, v := range form.Value {
  162. metaForm[k] = v[0]
  163. }
  164. jstr, _ := json.Marshal(metaForm)
  165. req.Payload = jstr
  166. fh := form.File["file"][0]
  167. fi, _ := fh.Open()
  168. buf, _ := ioutil.ReadAll(fi)
  169. suffix := path.Ext(fh.Filename)
  170. tmpFile, err := ioutil.TempFile(os.TempDir(), "multipart-*"+suffix)
  171. if err != nil {
  172. log.Fatal("Cannot create temporary file", err)
  173. return nil, "", err
  174. }
  175. _, err = tmpFile.Write(buf)
  176. if err != nil {
  177. log.Fatal("Failed to write to temporary file", err)
  178. return nil, "", err
  179. }
  180. return req, tmpFile.Name(), nil
  181. }
  182. // func RpcxResponse2HttpResponse(res *protocol.Message) (url.Values, []byte, error) {
  183. // m := make(url.Values)
  184. // m.Set(XVersion, strconv.Itoa(int(res.Version())))
  185. // if res.IsHeartbeat() {
  186. // m.Set(XHeartbeat, "true")
  187. // }
  188. // if res.IsOneway() {
  189. // m.Set(XOneway, "true")
  190. // }
  191. // if res.MessageStatusType() == protocol.Error {
  192. // m.Set(XMessageStatusType, "Error")
  193. // } else {
  194. // m.Set(XMessageStatusType, "Normal")
  195. // }
  196. // if res.CompressType() == protocol.Gzip {
  197. // m.Set("Content-Encoding", "gzip")
  198. // }
  199. // m.Set(XSerializeType, strconv.Itoa(int(res.SerializeType())))
  200. // m.Set(XMessageID, strconv.FormatUint(res.Seq(), 10))
  201. // m.Set(XServicePath, res.ServicePath)
  202. // m.Set(XServiceMethod, res.ServiceMethod)
  203. // return m, res.Payload, nil
  204. // }