invoker.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. package dynamic
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gogf/gf/errors/gerror"
  6. "github.com/gogf/gf/frame/g"
  7. "github.com/smallnest/rpcx/log"
  8. "github.com/smallnest/rpcx/share"
  9. "reflect"
  10. "runtime"
  11. "sync"
  12. "unicode"
  13. "unicode/utf8"
  14. )
  15. var Invoker *InvokerImpl
  16. func init() {
  17. Invoker = NewInvoker()
  18. }
  19. type InvokerImpl struct {
  20. classMapMu sync.RWMutex
  21. classMap map[string]*service
  22. }
  23. func NewInvoker() *InvokerImpl {
  24. return &InvokerImpl{
  25. classMap: make(map[string]*service),
  26. }
  27. }
  28. var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
  29. var typeOfContext = reflect.TypeOf((*context.Context)(nil)).Elem()
  30. type service struct {
  31. name string // name of service
  32. rcvr reflect.Value // receiver of methods for the service
  33. typ reflect.Type // type of the receiver
  34. method map[string]*methodType // registered methods
  35. }
  36. type methodType struct {
  37. sync.Mutex // protects counters
  38. method reflect.Method
  39. ArgType reflect.Type
  40. ReplyType reflect.Type
  41. // numCalls uint
  42. }
  43. func isExported(name string) bool {
  44. rune, _ := utf8.DecodeRuneInString(name)
  45. return unicode.IsUpper(rune)
  46. }
  47. func isExportedOrBuiltinType(t reflect.Type) bool {
  48. for t.Kind() == reflect.Ptr {
  49. t = t.Elem()
  50. }
  51. return isExported(t.Name()) || t.PkgPath() == ""
  52. }
  53. func (s *InvokerImpl) Register(class interface{}) error {
  54. s.classMapMu.Lock()
  55. defer s.classMapMu.Unlock()
  56. service := new(service)
  57. service.typ = reflect.TypeOf(class)
  58. service.rcvr = reflect.ValueOf(class)
  59. sname := reflect.Indirect(service.rcvr).Type().Name() // Type
  60. if sname == "" {
  61. errorStr := "InvokerImpl.Register: no service name for type " + service.typ.String()
  62. g.Log().Error(errorStr)
  63. return gerror.New(errorStr)
  64. }
  65. if !isExported(sname) {
  66. errorStr := "InvokerImpl.Register: type " + sname + " is not exported"
  67. g.Log().Error(errorStr)
  68. return gerror.New(errorStr)
  69. }
  70. service.name = sname
  71. // Install the methods
  72. service.method = suitableMethods(service.typ, true)
  73. if len(service.method) == 0 {
  74. var errorStr string
  75. method := suitableMethods(reflect.PtrTo(service.typ), false)
  76. if len(method) != 0 {
  77. errorStr = "rpcx.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"
  78. } else {
  79. errorStr = "rpcx.Register: type " + sname + " has no exported methods of suitable type"
  80. }
  81. g.Log().Error(errorStr)
  82. return gerror.New(errorStr)
  83. }
  84. s.classMap[service.name] = service
  85. return nil
  86. }
  87. func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
  88. methods := make(map[string]*methodType)
  89. for m := 0; m < typ.NumMethod(); m++ {
  90. method := typ.Method(m)
  91. mtype := method.Type
  92. mname := method.Name
  93. // Method must be exported.
  94. if method.PkgPath != "" {
  95. continue
  96. }
  97. // Method needs four ins: receiver, context.Context, *args, *reply.
  98. if mtype.NumIn() != 4 {
  99. if reportErr {
  100. log.Debug("method ", mname, " has wrong number of ins:", mtype.NumIn())
  101. }
  102. continue
  103. }
  104. // First arg must be context.Context
  105. ctxType := mtype.In(1)
  106. if !ctxType.Implements(typeOfContext) {
  107. if reportErr {
  108. log.Debug("method ", mname, " must use context.Context as the first parameter")
  109. }
  110. continue
  111. }
  112. // Second arg need not be a pointer.
  113. argType := mtype.In(2)
  114. if !isExportedOrBuiltinType(argType) {
  115. if reportErr {
  116. log.Info(mname, " parameter type not exported: ", argType)
  117. }
  118. continue
  119. }
  120. // Third arg must be a pointer.
  121. replyType := mtype.In(3)
  122. if replyType.Kind() != reflect.Ptr {
  123. if reportErr {
  124. log.Info("method", mname, " reply type not a pointer:", replyType)
  125. }
  126. continue
  127. }
  128. // Reply type must be exported.
  129. if !isExportedOrBuiltinType(replyType) {
  130. if reportErr {
  131. log.Info("method", mname, " reply type not exported:", replyType)
  132. }
  133. continue
  134. }
  135. // Method needs one out.
  136. if mtype.NumOut() != 1 {
  137. if reportErr {
  138. log.Info("method", mname, " has wrong number of outs:", mtype.NumOut())
  139. }
  140. continue
  141. }
  142. // The return type of the method must be error.
  143. if returnType := mtype.Out(0); returnType != typeOfError {
  144. if reportErr {
  145. log.Info("method", mname, " returns ", returnType.String(), " not error")
  146. }
  147. continue
  148. }
  149. methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
  150. reflectTypePools.Init(argType)
  151. reflectTypePools.Init(replyType)
  152. }
  153. return methods
  154. }
  155. func (s *InvokerImpl) HandleInvoker(ctx context.Context, req *Message) (res *Message, err error) {
  156. s.classMapMu.RLock()
  157. service := s.classMap[req.ClassName]
  158. if share.Trace {
  159. log.Debugf("server get service %+v for an request", service)
  160. }
  161. s.classMapMu.RUnlock()
  162. res = req.Clone()
  163. if service == nil {
  164. }
  165. mtype := service.method[req.MethodName]
  166. if mtype == nil {
  167. }
  168. args := reflectTypePools.Get(mtype.ArgType)
  169. if mtype.ArgType == reflect.TypeOf(req.Payload) {
  170. args = req.Payload
  171. }
  172. reply := reflectTypePools.Get(mtype.ReplyType)
  173. if mtype.ArgType.Kind() != reflect.Ptr {
  174. err = service.call(ctx, mtype, reflect.ValueOf(args).Elem(), reflect.ValueOf(reply))
  175. } else {
  176. err = service.call(ctx, mtype, reflect.ValueOf(args), reflect.ValueOf(reply))
  177. }
  178. reflectTypePools.Put(mtype.ArgType, args)
  179. if err != nil {
  180. if reply != nil {
  181. reflectTypePools.Put(mtype.ReplyType, reply)
  182. res.Payload = reply
  183. }
  184. } else {
  185. // return reply to object pool
  186. reflectTypePools.Put(mtype.ReplyType, reply)
  187. res.Payload = reply
  188. }
  189. return res, nil
  190. }
  191. func (s *service) call(ctx context.Context, mtype *methodType, args, reply reflect.Value) (err error) {
  192. defer func() {
  193. if r := recover(); r != nil {
  194. buf := make([]byte, 4096)
  195. n := runtime.Stack(buf, false)
  196. buf = buf[:n]
  197. err = fmt.Errorf("[service internal error]: %v, method: %s, args: %+v, stack: %s",
  198. r, mtype.method.Name, args.Interface(), buf)
  199. log.Error(err)
  200. }
  201. }()
  202. function := mtype.method.Func
  203. // Invoke the method, providing a new value for the reply.
  204. returnValues := function.Call([]reflect.Value{s.rcvr, reflect.ValueOf(ctx), args, reply})
  205. // The return value for the method is an error.
  206. errInter := returnValues[0].Interface()
  207. if errInter != nil {
  208. return errInter.(error)
  209. }
  210. return nil
  211. }
  212. type Message struct {
  213. ClassName string
  214. MethodName string
  215. Metadata map[string]string
  216. Payload interface{}
  217. }
  218. // Clone clones from an message.
  219. func (m Message) Clone() *Message {
  220. return &Message{
  221. ClassName: m.ClassName,
  222. MethodName: m.MethodName,
  223. Metadata: m.Metadata,
  224. }
  225. }