| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260 |
- package dynamic
- import (
- "context"
- "fmt"
- "github.com/gogf/gf/errors/gerror"
- "github.com/gogf/gf/frame/g"
- "github.com/smallnest/rpcx/log"
- "github.com/smallnest/rpcx/share"
- "reflect"
- "runtime"
- "sync"
- "unicode"
- "unicode/utf8"
- )
- var Invoker *InvokerImpl
- func init() {
- Invoker = NewInvoker()
- }
- type InvokerImpl struct {
- classMapMu sync.RWMutex
- classMap map[string]*service
- }
- func NewInvoker() *InvokerImpl {
- return &InvokerImpl{
- classMap: make(map[string]*service),
- }
- }
- var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
- var typeOfContext = reflect.TypeOf((*context.Context)(nil)).Elem()
- type service struct {
- name string // name of service
- rcvr reflect.Value // receiver of methods for the service
- typ reflect.Type // type of the receiver
- method map[string]*methodType // registered methods
- }
- type methodType struct {
- sync.Mutex // protects counters
- method reflect.Method
- ArgType reflect.Type
- ReplyType reflect.Type
- // numCalls uint
- }
- func isExported(name string) bool {
- rune, _ := utf8.DecodeRuneInString(name)
- return unicode.IsUpper(rune)
- }
- func isExportedOrBuiltinType(t reflect.Type) bool {
- for t.Kind() == reflect.Ptr {
- t = t.Elem()
- }
- return isExported(t.Name()) || t.PkgPath() == ""
- }
- func (s *InvokerImpl) Register(class interface{}) error {
- s.classMapMu.Lock()
- defer s.classMapMu.Unlock()
- service := new(service)
- service.typ = reflect.TypeOf(class)
- service.rcvr = reflect.ValueOf(class)
- sname := reflect.Indirect(service.rcvr).Type().Name() // Type
- if sname == "" {
- errorStr := "InvokerImpl.Register: no service name for type " + service.typ.String()
- g.Log().Error(errorStr)
- return gerror.New(errorStr)
- }
- if !isExported(sname) {
- errorStr := "InvokerImpl.Register: type " + sname + " is not exported"
- g.Log().Error(errorStr)
- return gerror.New(errorStr)
- }
- service.name = sname
- // Install the methods
- service.method = suitableMethods(service.typ, true)
- if len(service.method) == 0 {
- var errorStr string
- method := suitableMethods(reflect.PtrTo(service.typ), false)
- if len(method) != 0 {
- errorStr = "rpcx.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"
- } else {
- errorStr = "rpcx.Register: type " + sname + " has no exported methods of suitable type"
- }
- g.Log().Error(errorStr)
- return gerror.New(errorStr)
- }
- s.classMap[service.name] = service
- return nil
- }
- func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
- methods := make(map[string]*methodType)
- for m := 0; m < typ.NumMethod(); m++ {
- method := typ.Method(m)
- mtype := method.Type
- mname := method.Name
- // Method must be exported.
- if method.PkgPath != "" {
- continue
- }
- // Method needs four ins: receiver, context.Context, *args, *reply.
- if mtype.NumIn() != 4 {
- if reportErr {
- log.Debug("method ", mname, " has wrong number of ins:", mtype.NumIn())
- }
- continue
- }
- // First arg must be context.Context
- ctxType := mtype.In(1)
- if !ctxType.Implements(typeOfContext) {
- if reportErr {
- log.Debug("method ", mname, " must use context.Context as the first parameter")
- }
- continue
- }
- // Second arg need not be a pointer.
- argType := mtype.In(2)
- if !isExportedOrBuiltinType(argType) {
- if reportErr {
- log.Info(mname, " parameter type not exported: ", argType)
- }
- continue
- }
- // Third arg must be a pointer.
- replyType := mtype.In(3)
- if replyType.Kind() != reflect.Ptr {
- if reportErr {
- log.Info("method", mname, " reply type not a pointer:", replyType)
- }
- continue
- }
- // Reply type must be exported.
- if !isExportedOrBuiltinType(replyType) {
- if reportErr {
- log.Info("method", mname, " reply type not exported:", replyType)
- }
- continue
- }
- // Method needs one out.
- if mtype.NumOut() != 1 {
- if reportErr {
- log.Info("method", mname, " has wrong number of outs:", mtype.NumOut())
- }
- continue
- }
- // The return type of the method must be error.
- if returnType := mtype.Out(0); returnType != typeOfError {
- if reportErr {
- log.Info("method", mname, " returns ", returnType.String(), " not error")
- }
- continue
- }
- methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
- reflectTypePools.Init(argType)
- reflectTypePools.Init(replyType)
- }
- return methods
- }
- func (s *InvokerImpl) HandleInvoker(ctx context.Context, req *Message) (res *Message, err error) {
- s.classMapMu.RLock()
- service := s.classMap[req.ClassName]
- if share.Trace {
- log.Debugf("server get service %+v for an request", service)
- }
- s.classMapMu.RUnlock()
- res = req.Clone()
- if service == nil {
- }
- mtype := service.method[req.MethodName]
- if mtype == nil {
- }
- args := reflectTypePools.Get(mtype.ArgType)
- if mtype.ArgType == reflect.TypeOf(req.Payload) {
- args = req.Payload
- }
- reply := reflectTypePools.Get(mtype.ReplyType)
- if mtype.ArgType.Kind() != reflect.Ptr {
- err = service.call(ctx, mtype, reflect.ValueOf(args).Elem(), reflect.ValueOf(reply))
- } else {
- err = service.call(ctx, mtype, reflect.ValueOf(args), reflect.ValueOf(reply))
- }
- reflectTypePools.Put(mtype.ArgType, args)
- if err != nil {
- if reply != nil {
- reflectTypePools.Put(mtype.ReplyType, reply)
- res.Payload = reply
- }
- } else {
- // return reply to object pool
- reflectTypePools.Put(mtype.ReplyType, reply)
- res.Payload = reply
- }
- return res, nil
- }
- func (s *service) call(ctx context.Context, mtype *methodType, args, reply reflect.Value) (err error) {
- defer func() {
- if r := recover(); r != nil {
- buf := make([]byte, 4096)
- n := runtime.Stack(buf, false)
- buf = buf[:n]
- err = fmt.Errorf("[service internal error]: %v, method: %s, args: %+v, stack: %s",
- r, mtype.method.Name, args.Interface(), buf)
- log.Error(err)
- }
- }()
- function := mtype.method.Func
- // Invoke the method, providing a new value for the reply.
- returnValues := function.Call([]reflect.Value{s.rcvr, reflect.ValueOf(ctx), args, reply})
- // The return value for the method is an error.
- errInter := returnValues[0].Interface()
- if errInter != nil {
- return errInter.(error)
- }
- return nil
- }
- type Message struct {
- ClassName string
- MethodName string
- Metadata map[string]string
- Payload interface{}
- }
- // Clone clones from an message.
- func (m Message) Clone() *Message {
- return &Message{
- ClassName: m.ClassName,
- MethodName: m.MethodName,
- Metadata: m.Metadata,
- }
- }
|