server.go 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package echo
  2. import (
  3. "net/http"
  4. "strings"
  5. "github.com/labstack/echo"
  6. . "github.com/rpcxio/rpcx-gateway"
  7. )
  8. type Server struct {
  9. addr string
  10. e *echo.Echo
  11. }
  12. // New returns a server.
  13. func New(addr string) *Server {
  14. return &Server{
  15. addr: addr,
  16. }
  17. }
  18. // NewWithEcho returns a server with preconfigured echo.
  19. func NewWithEcho(addr string, e *echo.Echo) *Server {
  20. return &Server{
  21. addr: addr,
  22. e: e,
  23. }
  24. }
  25. // RegisterHandler configures the handler to handle http rpcx invoke.
  26. // It wraps ServiceHandler into httprouter.Handle.
  27. func (s *Server) RegisterHandler(base string, handler ServiceHandler) {
  28. e := s.e
  29. if e == nil {
  30. e = echo.New()
  31. }
  32. h := wrapServiceHandler(handler)
  33. e.POST(base, h)
  34. e.GET(base, h)
  35. e.PUT(base, h)
  36. s.e = e
  37. }
  38. func wrapServiceHandler(handler ServiceHandler) echo.HandlerFunc {
  39. return func(ctx echo.Context) error {
  40. r := ctx.Request()
  41. w := ctx.Response()
  42. if r.Header.Get(XServicePath) == "" {
  43. servicePath := ctx.Param("servicePath")
  44. if strings.HasPrefix(servicePath, "/") {
  45. servicePath = servicePath[1:]
  46. }
  47. r.Header.Set(XServicePath, servicePath)
  48. }
  49. servicePath := r.Header.Get(XServicePath)
  50. messageID := r.Header.Get(XMessageID)
  51. wh := w.Header()
  52. if messageID != "" {
  53. wh.Set(XMessageID, messageID)
  54. }
  55. meta, payload, err := handler(r, servicePath)
  56. for k, v := range meta {
  57. wh.Set(k, v)
  58. }
  59. if err == nil {
  60. ctx.Blob(http.StatusOK, "application/octet-stream", payload)
  61. return nil
  62. }
  63. rh := r.Header
  64. for k, v := range rh {
  65. if strings.HasPrefix(k, "X-RPCX-") && len(v) > 0 {
  66. wh.Set(k, v[0])
  67. }
  68. }
  69. wh.Set(XMessageStatusType, "Error")
  70. wh.Set(XErrorMessage, err.Error())
  71. ctx.String(http.StatusOK, err.Error())
  72. return nil
  73. }
  74. }
  75. func (s *Server) Serve() error {
  76. return s.e.Start(s.addr)
  77. }