package gateway import ( "encoding/json" "errors" "io/ioutil" "log" "net/http" "net/url" "os" "path" "strconv" "github.com/smallnest/rpcx/protocol" ) const ( XVersion = "X-RPCX-Version" XMessageType = "X-RPCX-MesssageType" XHeartbeat = "X-RPCX-Heartbeat" XOneway = "X-RPCX-Oneway" XMessageStatusType = "X-RPCX-MessageStatusType" XSerializeType = "X-RPCX-SerializeType" XMessageID = "X-RPCX-MessageID" XServicePath = "X-RPCX-ServicePath" XServiceMethod = "X-RPCX-ServiceMethod" XMeta = "X-RPCX-Meta" XErrorMessage = "X-RPCX-ErrorMessage" ) func HttpRequest2RpcxRequest(r *http.Request) (*protocol.Message, error) { req := protocol.NewMessage() req.SetMessageType(protocol.Request) h := r.Header seq := h.Get(XMessageID) if seq != "" { id, err := strconv.ParseUint(seq, 10, 64) if err != nil { return nil, err } req.SetSeq(id) } heartbeat := h.Get(XHeartbeat) if heartbeat != "" { req.SetHeartbeat(true) } oneway := h.Get(XOneway) if oneway != "" { req.SetOneway(true) } if h.Get("Content-Encoding") == "gzip" { req.SetCompressType(protocol.Gzip) } st := h.Get(XSerializeType) if st != "" { rst, err := strconv.Atoi(st) if err != nil { return nil, err } req.SetSerializeType(protocol.SerializeType(rst)) } else { return nil, errors.New("empty serialized type") } meta := h.Get(XMeta) if meta != "" { metadata, err := url.ParseQuery(meta) if err != nil { return nil, err } mm := make(map[string]string) for k, v := range metadata { if len(v) > 0 { mm[k] = v[0] } } req.Metadata = mm } sp := h.Get(XServicePath) if sp != "" { req.ServicePath = sp } else { return nil, errors.New("empty servicepath") } sm := h.Get(XServiceMethod) if sm != "" { req.ServiceMethod = sm } else { return nil, errors.New("empty servicemethod") } payload, err := ioutil.ReadAll(r.Body) if err != nil { return nil, err } req.Payload = payload return req, nil } func MultipartRequest2RpcxRequest(r *http.Request) (*protocol.Message, string, error) { req := protocol.NewMessage() req.SetMessageType(protocol.Request) h := r.Header seq := h.Get(XMessageID) if seq != "" { id, err := strconv.ParseUint(seq, 10, 64) if err != nil { return nil, "", err } req.SetSeq(id) } heartbeat := h.Get(XHeartbeat) if heartbeat != "" { req.SetHeartbeat(true) } oneway := h.Get(XOneway) if oneway != "" { req.SetOneway(true) } if h.Get("Content-Encoding") == "gzip" { req.SetCompressType(protocol.Gzip) } st := h.Get(XSerializeType) if st != "" { rst, err := strconv.Atoi(st) if err != nil { return nil, "", err } req.SetSerializeType(protocol.SerializeType(rst)) } else { return nil, "", errors.New("empty serialized type") } meta := h.Get(XMeta) if meta != "" { metadata, err := url.ParseQuery(meta) if err != nil { return nil, "", err } mm := make(map[string]string) for k, v := range metadata { if len(v) > 0 { mm[k] = v[0] } } req.Metadata = mm } sp := h.Get(XServicePath) if sp != "" { req.ServicePath = sp } else { return nil, "", errors.New("empty servicepath") } sm := h.Get(XServiceMethod) if sm != "" { req.ServiceMethod = sm } else { return nil, "", errors.New("empty servicemethod") } multipartReader, readerErr := r.MultipartReader() if readerErr != nil { return nil, "", readerErr } form, parseErr := multipartReader.ReadForm(32 << 20) if parseErr != nil { return nil, "", parseErr } metaForm := make(map[string]string) for k, v := range form.Value { metaForm[k] = v[0] } jstr, _ := json.Marshal(metaForm) req.Payload = jstr fh := form.File["file"][0] fi, _ := fh.Open() buf, _ := ioutil.ReadAll(fi) suffix := path.Ext(fh.Filename) tmpFile, err := ioutil.TempFile(os.TempDir(), "multipart-*"+suffix) if err != nil { log.Fatal("Cannot create temporary file", err) return nil, "", err } _, err = tmpFile.Write(buf) if err != nil { log.Fatal("Failed to write to temporary file", err) return nil, "", err } return req, tmpFile.Name(), nil } // func RpcxResponse2HttpResponse(res *protocol.Message) (url.Values, []byte, error) { // m := make(url.Values) // m.Set(XVersion, strconv.Itoa(int(res.Version()))) // if res.IsHeartbeat() { // m.Set(XHeartbeat, "true") // } // if res.IsOneway() { // m.Set(XOneway, "true") // } // if res.MessageStatusType() == protocol.Error { // m.Set(XMessageStatusType, "Error") // } else { // m.Set(XMessageStatusType, "Normal") // } // if res.CompressType() == protocol.Gzip { // m.Set("Content-Encoding", "gzip") // } // m.Set(XSerializeType, strconv.Itoa(int(res.SerializeType()))) // m.Set(XMessageID, strconv.FormatUint(res.Seq(), 10)) // m.Set(XServicePath, res.ServicePath) // m.Set(XServiceMethod, res.ServiceMethod) // return m, res.Payload, nil // }