package dynamic import ( "context" "fmt" "github.com/gogf/gf/errors/gerror" "github.com/gogf/gf/frame/g" "github.com/smallnest/rpcx/log" "github.com/smallnest/rpcx/share" "reflect" "runtime" "sync" "unicode" "unicode/utf8" ) var Invoker *InvokerImpl func init() { Invoker = NewInvoker() } type InvokerImpl struct { classMapMu sync.RWMutex classMap map[string]*service } func NewInvoker() *InvokerImpl { return &InvokerImpl{ classMap: make(map[string]*service), } } var typeOfError = reflect.TypeOf((*error)(nil)).Elem() var typeOfContext = reflect.TypeOf((*context.Context)(nil)).Elem() type service struct { name string // name of service rcvr reflect.Value // receiver of methods for the service typ reflect.Type // type of the receiver method map[string]*methodType // registered methods } type methodType struct { sync.Mutex // protects counters method reflect.Method ArgType reflect.Type ReplyType reflect.Type // numCalls uint } func isExported(name string) bool { rune, _ := utf8.DecodeRuneInString(name) return unicode.IsUpper(rune) } func isExportedOrBuiltinType(t reflect.Type) bool { for t.Kind() == reflect.Ptr { t = t.Elem() } return isExported(t.Name()) || t.PkgPath() == "" } func (s *InvokerImpl) Register(class interface{}) error { s.classMapMu.Lock() defer s.classMapMu.Unlock() service := new(service) service.typ = reflect.TypeOf(class) service.rcvr = reflect.ValueOf(class) sname := reflect.Indirect(service.rcvr).Type().Name() // Type if sname == "" { errorStr := "InvokerImpl.Register: no service name for type " + service.typ.String() g.Log().Error(errorStr) return gerror.New(errorStr) } if !isExported(sname) { errorStr := "InvokerImpl.Register: type " + sname + " is not exported" g.Log().Error(errorStr) return gerror.New(errorStr) } service.name = sname // Install the methods service.method = suitableMethods(service.typ, true) if len(service.method) == 0 { var errorStr string method := suitableMethods(reflect.PtrTo(service.typ), false) if len(method) != 0 { errorStr = "rpcx.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)" } else { errorStr = "rpcx.Register: type " + sname + " has no exported methods of suitable type" } g.Log().Error(errorStr) return gerror.New(errorStr) } s.classMap[service.name] = service return nil } func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType { methods := make(map[string]*methodType) for m := 0; m < typ.NumMethod(); m++ { method := typ.Method(m) mtype := method.Type mname := method.Name // Method must be exported. if method.PkgPath != "" { continue } // Method needs four ins: receiver, context.Context, *args, *reply. if mtype.NumIn() != 4 { if reportErr { log.Debug("method ", mname, " has wrong number of ins:", mtype.NumIn()) } continue } // First arg must be context.Context ctxType := mtype.In(1) if !ctxType.Implements(typeOfContext) { if reportErr { log.Debug("method ", mname, " must use context.Context as the first parameter") } continue } // Second arg need not be a pointer. argType := mtype.In(2) if !isExportedOrBuiltinType(argType) { if reportErr { log.Info(mname, " parameter type not exported: ", argType) } continue } // Third arg must be a pointer. replyType := mtype.In(3) if replyType.Kind() != reflect.Ptr { if reportErr { log.Info("method", mname, " reply type not a pointer:", replyType) } continue } // Reply type must be exported. if !isExportedOrBuiltinType(replyType) { if reportErr { log.Info("method", mname, " reply type not exported:", replyType) } continue } // Method needs one out. if mtype.NumOut() != 1 { if reportErr { log.Info("method", mname, " has wrong number of outs:", mtype.NumOut()) } continue } // The return type of the method must be error. if returnType := mtype.Out(0); returnType != typeOfError { if reportErr { log.Info("method", mname, " returns ", returnType.String(), " not error") } continue } methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType} reflectTypePools.Init(argType) reflectTypePools.Init(replyType) } return methods } func (s *InvokerImpl) HandleInvoker(ctx context.Context, req *Message) (res *Message, err error) { s.classMapMu.RLock() service := s.classMap[req.ClassName] if share.Trace { log.Debugf("server get service %+v for an request", service) } s.classMapMu.RUnlock() res = req.Clone() if service == nil { } mtype := service.method[req.MethodName] if mtype == nil { } args := reflectTypePools.Get(mtype.ArgType) if mtype.ArgType == reflect.TypeOf(req.Payload) { args = req.Payload } reply := reflectTypePools.Get(mtype.ReplyType) if mtype.ArgType.Kind() != reflect.Ptr { err = service.call(ctx, mtype, reflect.ValueOf(args).Elem(), reflect.ValueOf(reply)) } else { err = service.call(ctx, mtype, reflect.ValueOf(args), reflect.ValueOf(reply)) } reflectTypePools.Put(mtype.ArgType, args) if err != nil { if reply != nil { reflectTypePools.Put(mtype.ReplyType, reply) res.Payload = reply } } else { // return reply to object pool reflectTypePools.Put(mtype.ReplyType, reply) res.Payload = reply } return res, nil } func (s *service) call(ctx context.Context, mtype *methodType, args, reply reflect.Value) (err error) { defer func() { if r := recover(); r != nil { buf := make([]byte, 4096) n := runtime.Stack(buf, false) buf = buf[:n] err = fmt.Errorf("[service internal error]: %v, method: %s, args: %+v, stack: %s", r, mtype.method.Name, args.Interface(), buf) log.Error(err) } }() function := mtype.method.Func // Invoke the method, providing a new value for the reply. returnValues := function.Call([]reflect.Value{s.rcvr, reflect.ValueOf(ctx), args, reply}) // The return value for the method is an error. errInter := returnValues[0].Interface() if errInter != nil { return errInter.(error) } return nil } type Message struct { ClassName string MethodName string Metadata map[string]string Payload interface{} } // Clone clones from an message. func (m Message) Clone() *Message { return &Message{ ClassName: m.ClassName, MethodName: m.MethodName, Metadata: m.Metadata, } }