invoker.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. package dynamic
  2. import (
  3. "context"
  4. "dashoo.cn/opms_libary/myerrors"
  5. "fmt"
  6. "github.com/gogf/gf/errors/gerror"
  7. "github.com/gogf/gf/frame/g"
  8. "github.com/gogf/gf/util/gconv"
  9. "github.com/smallnest/rpcx/log"
  10. "github.com/smallnest/rpcx/share"
  11. "reflect"
  12. "runtime"
  13. "sync"
  14. "unicode"
  15. "unicode/utf8"
  16. )
  17. var Invoker *InvokerImpl
  18. func init() {
  19. Invoker = NewInvoker()
  20. }
  21. type InvokerImpl struct {
  22. classMapMu sync.RWMutex
  23. classMap map[string]*service
  24. }
  25. func NewInvoker() *InvokerImpl {
  26. return &InvokerImpl{
  27. classMap: make(map[string]*service),
  28. }
  29. }
  30. var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
  31. var typeOfContext = reflect.TypeOf((*context.Context)(nil)).Elem()
  32. type service struct {
  33. name string // name of service
  34. rcvr reflect.Value // receiver of methods for the service
  35. typ reflect.Type // type of the receiver
  36. method map[string]*methodType // registered methods
  37. }
  38. type methodType struct {
  39. sync.Mutex // protects counters
  40. method reflect.Method
  41. ArgType reflect.Type
  42. ReplyType reflect.Type
  43. // numCalls uint
  44. }
  45. func isExported(name string) bool {
  46. rune, _ := utf8.DecodeRuneInString(name)
  47. return unicode.IsUpper(rune)
  48. }
  49. func isExportedOrBuiltinType(t reflect.Type) bool {
  50. for t.Kind() == reflect.Ptr {
  51. t = t.Elem()
  52. }
  53. return isExported(t.Name()) || t.PkgPath() == ""
  54. }
  55. func (s *InvokerImpl) Register(class interface{}) error {
  56. s.classMapMu.Lock()
  57. defer s.classMapMu.Unlock()
  58. service := new(service)
  59. service.typ = reflect.TypeOf(class)
  60. service.rcvr = reflect.ValueOf(class)
  61. sname := reflect.Indirect(service.rcvr).Type().Name() // Type
  62. if sname == "" {
  63. errorStr := "InvokerImpl.Register: no service name for type " + service.typ.String()
  64. g.Log().Error(errorStr)
  65. return gerror.New(errorStr)
  66. }
  67. if !isExported(sname) {
  68. errorStr := "InvokerImpl.Register: type " + sname + " is not exported"
  69. g.Log().Error(errorStr)
  70. return gerror.New(errorStr)
  71. }
  72. service.name = sname
  73. // Install the methods
  74. service.method = suitableMethods(service.typ, true)
  75. if len(service.method) == 0 {
  76. var errorStr string
  77. method := suitableMethods(reflect.PtrTo(service.typ), false)
  78. if len(method) != 0 {
  79. errorStr = "rpcx.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"
  80. } else {
  81. errorStr = "rpcx.Register: type " + sname + " has no exported methods of suitable type"
  82. }
  83. g.Log().Error(errorStr)
  84. return gerror.New(errorStr)
  85. }
  86. s.classMap[service.name] = service
  87. return nil
  88. }
  89. func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
  90. methods := make(map[string]*methodType)
  91. for m := 0; m < typ.NumMethod(); m++ {
  92. method := typ.Method(m)
  93. mtype := method.Type
  94. mname := method.Name
  95. // Method must be exported.
  96. if method.PkgPath != "" {
  97. continue
  98. }
  99. // Method needs four ins: receiver, context.Context, *args, *reply.
  100. if mtype.NumIn() != 4 {
  101. if reportErr {
  102. log.Debug("method ", mname, " has wrong number of ins:", mtype.NumIn())
  103. }
  104. continue
  105. }
  106. // First arg must be context.Context
  107. ctxType := mtype.In(1)
  108. if !ctxType.Implements(typeOfContext) {
  109. if reportErr {
  110. log.Debug("method ", mname, " must use context.Context as the first parameter")
  111. }
  112. continue
  113. }
  114. // Second arg need not be a pointer.
  115. argType := mtype.In(2)
  116. if !isExportedOrBuiltinType(argType) {
  117. if reportErr {
  118. log.Info(mname, " parameter type not exported: ", argType)
  119. }
  120. continue
  121. }
  122. // Third arg must be a pointer.
  123. replyType := mtype.In(3)
  124. if replyType.Kind() != reflect.Ptr {
  125. if reportErr {
  126. log.Info("method", mname, " reply type not a pointer:", replyType)
  127. }
  128. continue
  129. }
  130. // Reply type must be exported.
  131. if !isExportedOrBuiltinType(replyType) {
  132. if reportErr {
  133. log.Info("method", mname, " reply type not exported:", replyType)
  134. }
  135. continue
  136. }
  137. // Method needs one out.
  138. if mtype.NumOut() != 1 {
  139. if reportErr {
  140. log.Info("method", mname, " has wrong number of outs:", mtype.NumOut())
  141. }
  142. continue
  143. }
  144. // The return type of the method must be error.
  145. if returnType := mtype.Out(0); returnType != typeOfError {
  146. if reportErr {
  147. log.Info("method", mname, " returns ", returnType.String(), " not error")
  148. }
  149. continue
  150. }
  151. methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
  152. reflectTypePools.Init(argType)
  153. reflectTypePools.Init(replyType)
  154. }
  155. return methods
  156. }
  157. func (s *InvokerImpl) HandleInvoker(ctx context.Context, req *Message) (res *Message, err error) {
  158. s.classMapMu.RLock()
  159. service := s.classMap[req.ClassName]
  160. if share.Trace {
  161. log.Debugf("server get service %+v for an request", service)
  162. }
  163. s.classMapMu.RUnlock()
  164. res = req.Clone()
  165. if service == nil {
  166. }
  167. mtype := service.method[req.MethodName]
  168. if mtype == nil {
  169. }
  170. args := reflectTypePools.Get(mtype.ArgType)
  171. if mtype.ArgType == reflect.TypeOf(req.Payload) {
  172. args = req.Payload
  173. }
  174. reply := reflectTypePools.Get(mtype.ReplyType)
  175. if mtype.ArgType.Kind() != reflect.Ptr {
  176. err = service.call(ctx, mtype, reflect.ValueOf(args).Elem(), reflect.ValueOf(reply))
  177. } else {
  178. err = service.call(ctx, mtype, reflect.ValueOf(args), reflect.ValueOf(reply))
  179. }
  180. reflectTypePools.Put(mtype.ArgType, args)
  181. if err != nil {
  182. if reply != nil {
  183. reflectTypePools.Put(mtype.ReplyType, reply)
  184. result := gconv.Map(reply)
  185. myerrors.HandlerErrorMsg(err, result)
  186. res.Payload = result
  187. }
  188. } else {
  189. // return reply to object pool
  190. reflectTypePools.Put(mtype.ReplyType, reply)
  191. res.Payload = reply
  192. }
  193. return res, nil
  194. }
  195. func (s *service) call(ctx context.Context, mtype *methodType, args, reply reflect.Value) (err error) {
  196. defer func() {
  197. if r := recover(); r != nil {
  198. buf := make([]byte, 4096)
  199. n := runtime.Stack(buf, false)
  200. buf = buf[:n]
  201. err = fmt.Errorf("[service internal error]: %v, method: %s, args: %+v, stack: %s",
  202. r, mtype.method.Name, args.Interface(), buf)
  203. log.Error(err)
  204. }
  205. }()
  206. function := mtype.method.Func
  207. // Invoke the method, providing a new value for the reply.
  208. returnValues := function.Call([]reflect.Value{s.rcvr, reflect.ValueOf(ctx), args, reply})
  209. // The return value for the method is an error.
  210. errInter := returnValues[0].Interface()
  211. if errInter != nil {
  212. return errInter.(error)
  213. }
  214. return nil
  215. }
  216. type Message struct {
  217. ClassName string
  218. MethodName string
  219. Metadata map[string]string
  220. Payload interface{}
  221. }
  222. // Clone clones from an message.
  223. func (m Message) Clone() *Message {
  224. return &Message{
  225. ClassName: m.ClassName,
  226. MethodName: m.MethodName,
  227. Metadata: m.Metadata,
  228. }
  229. }