package micro_srv import ( "context" "dashoo.cn/opms_libary/dynamic" "dashoo.cn/opms_libary/gtoken" "dashoo.cn/opms_libary/mutipart" "dashoo.cn/opms_libary/request" "encoding/json" "errors" "fmt" "github.com/rcrowley/go-metrics" "io/ioutil" "net" "os" "path" "reflect" "strconv" "strings" "time" "github.com/gogf/gf/encoding/gbase64" "github.com/gogf/gf/encoding/gjson" "github.com/gogf/gf/frame/g" "github.com/gogf/gf/net/ghttp" "github.com/gogf/gf/text/gstr" "github.com/smallnest/rpcx/client" "github.com/smallnest/rpcx/protocol" "github.com/smallnest/rpcx/server" "github.com/smallnest/rpcx/serverplugin" "github.com/smallnest/rpcx/share" "dashoo.cn/common_definition/auth" ) var ( fileTransfer = reflect.TypeOf((*mutipart.FileTransfer)(nil)).Elem() ) // InitMicroSrvClient 获取微服务客户端,arg为可选参数,若有必须是两个,分别是:reg string, serverAddr string func InitMicroSrvClient(serviceName, key string, args ...string) (c client.XClient) { reg := g.Config().GetString("service_registry.registry") etcdAddr := g.Config().GetString("service_registry.server-addr") if len(args) == 2 { reg = args[0] etcdAddr = args[1] } config := g.Config().GetString(key) arr := strings.Split(config, ",") srvName := arr[0] if len(arr) == 2 { // 点对点 直连 d, _ := client.NewPeer2PeerDiscovery("tcp@"+arr[1], "") c = client.NewXClient(serviceName, client.Failtry, client.RandomSelect, d, client.DefaultOption) return c } else { if reg == "consul" { // 服务发现使用consul //d, _ := etcd_client.NewEtcdV3Discovery(srvName, serviceName, []string{etcdAddr}, nil) d, _ := client.NewConsulDiscovery(srvName, serviceName, []string{etcdAddr}, nil) c = client.NewXClient(serviceName, client.Failover, client.RoundRobin, d, client.DefaultOption) return c } } return nil } func CreateAndInitService(basePath string) *server.Server { srvAddr := g.Config().GetString("setting.bind-addr") fileAddr := g.Config().GetString("setting.bind-mutipart-addr") etcdAddr := g.Config().GetString("service_registry.server-addr") s := server.NewServer() if fileAddr != "" { p := server.NewStreamService(fileAddr, streamHandler, nil, 1000) s.EnableStreamService(share.StreamServiceName, p) } advertiseAddr := srvAddr if g.Config().GetBool("setting.need-advertise-addr") { advertiseAddr = g.Config().GetString("setting.advertise-addr") } g.Log().Infof("服务启动, basePath: %v, MicorSrv: %v", basePath, srvAddr) reg := g.Config().GetString("service_registry.registry") //if reg == "etcd" { // addEtcdRegistryPlugin(s, basePath, advertiseAddr, etcdAddr) //} if reg == "consul" { addConsulRegistryPlugin(s, basePath, advertiseAddr, etcdAddr) } return s } func addConsulRegistryPlugin(s *server.Server, basePath, srvAddr, consulAddr string) { r := &serverplugin.ConsulRegisterPlugin{ ServiceAddress: "tcp@" + srvAddr, ConsulServers: []string{consulAddr}, BasePath: basePath, Metrics: metrics.NewRegistry(), UpdateInterval: time.Minute, } err := r.Start() if err != nil { g.Log().Fatal(err) } g.Log().Infof("注册到Consul: %v, basePath: %v, MicorSrv: %v", consulAddr, basePath, srvAddr) s.Plugins.Add(r) } func streamHandler(conn net.Conn, args *share.StreamServiceArgs) { defer conn.Close() fileName := args.Meta["file_name"] suffix := path.Ext(fileName) tmpFile, err := ioutil.TempFile(os.TempDir(), "multipart-*"+suffix) if err != nil { g.Log().Error(err) return } defer os.Remove(tmpFile.Name()) fileSize := args.Meta["file_size"] size, _ := strconv.Atoi(fileSize) total := 0 buf := make([]byte, 4096) for { n, _ := conn.Read(buf) total += n _, err = tmpFile.Write(buf) if err != nil { g.Log().Error(err) return } //如果实际总接受字节数与客户端给的要传输字节数相等,说明传输完毕 if total == size { beanName, ok := args.Meta["bean_name"] if ok { result, err := handDynamicService(beanName, tmpFile, args.Meta) if err != nil { resp := make(map[string]interface{}) resp["code"] = 500 resp["data"] = err.Error() result, _ = json.Marshal(resp) } conn.Write(result) } else { es := errors.New(fmt.Sprintf("Service[%s] 没有实现HandleFileTransfer接口", args.Meta["bean_name"])) g.Log().Error(es) resp := make(map[string]interface{}) resp["code"] = 500 resp["data"] = es.Error() result, _ := json.Marshal(resp) conn.Write(result) } break } } conn.Close() } func handDynamicService(beanName string, file *os.File, param map[string]string) ([]byte, error) { bean := dynamic.BeanFactory.GetBean(beanName) if !bean.IsValid() { ce := errors.New(fmt.Sprintf("Service[%s] 没有注册", beanName)) g.Log().Error(ce) return nil, ce } typ := bean.Type() if bean.CanInterface() && typ.Implements(fileTransfer) { service := bean.Interface().(mutipart.FileTransfer) resp, err := service.HandleFileTransfer(file, param) if err != nil { ce := errors.New(fmt.Sprintf("Service[%s] 文件处理异常", beanName)) g.Log().Error(ce, err) return nil, ce } else { return json.Marshal(resp) } } else { ce := errors.New(fmt.Sprintf("Service[%s] 没有实现HandleFileTransfer接口", beanName)) g.Log().Error(ce) return nil, ce } } // HandleAuth 处理Auth认证 func HandleAuth(ctx context.Context, req *protocol.Message, token string, authExcludePaths []string) error { path := "/" + req.ServicePath + "/" + req.ServiceMethod //g.Log().Info("reqPath: ", path) //g.Log().Info("token: ", token) if authPath(path, authExcludePaths) { req.Metadata["authExclude"] = "false" var rsp gtoken.Resp notAuthSrv := ctx.Value("NotAuthSrv") if notAuthSrv != nil && notAuthSrv.(bool) { rsp = gtoken.GFToken.ValidToken(token) } else { rsp = validToken(token) } //return errors.New("InvalidToken") if rsp.Code != 0 { return errors.New("InvalidToken") } //userInfo, err := getUserInfoFromToken(rsp) //if err!=nil{ // return err //} //g.Dump(userInfo) if req.Metadata != nil { req.Metadata["userInfo"] = rsp.DataString() } return nil } return nil } // 判断路径是否需要进行认证拦截 // return true 需要认证 func authPath(urlPath string, authExcludePaths []string) bool { // 去除后斜杠 if strings.HasSuffix(urlPath, "/") { urlPath = gstr.SubStr(urlPath, 0, len(urlPath)-1) } // 排除路径处理,到这里nextFlag为true for _, excludePath := range authExcludePaths { tmpPath := excludePath // 前缀匹配 if strings.HasSuffix(tmpPath, "/*") { tmpPath = gstr.SubStr(tmpPath, 0, len(tmpPath)-2) if gstr.HasPrefix(urlPath, tmpPath) { // 前缀匹配不拦截 return false } } else { // 全路径匹配 if strings.HasSuffix(tmpPath, "/") { tmpPath = gstr.SubStr(tmpPath, 0, len(tmpPath)-1) } if urlPath == tmpPath { // 全路径匹配不拦截 return false } } } return true } // 验证token func validToken(token string) gtoken.Resp { grsp := gtoken.Resp{} if token == "" { grsp.Code = 401 grsp.Msg = "valid token empty" return grsp } authService := InitMicroSrvClient("Auth", "micro_srv.auth") defer authService.Close() rsp := &auth.Response{} err := authService.Call(context.TODO(), "ValidToken", token, rsp) if err != nil { g.Log().Error(err) grsp.Code = 401 return grsp } grsp.Code = int(rsp.Code) grsp.Msg = rsp.Msg grsp.Data = rsp.Data return grsp } // IsAuthExclude 是否进行auth验证 func IsAuthExclude(ctx context.Context) bool { reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string) flag, ok := reqMeta["authExclude"] if !ok || flag == "true" { return true } return false } // GetUserInfo 从context中获取UserInfo func GetUserInfo(ctx context.Context) (request.UserInfo, error) { reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string) userStr, ok := reqMeta["userInfo"] if !ok { return request.UserInfo{}, errors.New("不存在UserInfo数据") } userInfo, err := getUserInfoDataString(userStr) if err != nil { return request.UserInfo{}, err } return userInfo, nil } // GetTenant 从context中获取租户码 func GetTenant(ctx context.Context) (string, error) { reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string) tenant, ok := reqMeta["tenant"] if !ok { return "", errors.New("不存在租户码") } return tenant, nil } // GetReqMethod 从context中获取请求方式 func GetReqMethod(ctx context.Context) (string, error) { reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string) reqMethod, ok := reqMeta["reqMethod"] if !ok { return "", errors.New("获取请求方式异常") } return reqMethod, nil } // GetToken 从context中获取Token func GetToken(ctx context.Context) (string, error) { reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string) token, ok := reqMeta["__AUTH"] if !ok { return "", errors.New("token获取失败") } return token, nil } // GetBrowserInfo 从context中获取ClientIP和UserAgent func GetBrowserInfo(ctx context.Context) (clientIP string, userAgent string, err error) { reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string) clientIP, ok := reqMeta["clientIP"] if !ok { return "", "", errors.New("BrowserInfo获取失败") } userAgent, ok = reqMeta["userAgent"] if !ok { return "", "", errors.New("BrowserInfo获取失败") } userAgent, err = gbase64.DecodeToString(userAgent) return } // getUserInfoDataString 从userInfo字符串转换成对象 func getUserInfoDataString(userInfoString string) (request.UserInfo, error) { var userInfo request.UserInfo //uuid := "" if j, err := gjson.DecodeToJson([]byte(userInfoString)); err != nil { g.Log().Error(err) return userInfo, err } else { j.SetViolenceCheck(true) err = j.GetStruct("data", &userInfo) if err != nil { g.Log().Error(err) return userInfo, err } } return userInfo, nil } // SetTenant 设置租户码(传统WebAPI调用使用) func SetTenant(tenant string) context.Context { metadata := map[string]string{"tenant": tenant} return context.WithValue(context.Background(), share.ReqMetaDataKey, metadata) } // SetTenantAndAuth 设置租户码和认证信息(传统WebAPI调用使用) func SetTenantAndAuth(r *ghttp.Request, client client.XClient) context.Context { // 处理Auth token := getRequestToken(r) if token != "" { client.Auth(token) } // 处理租户码 tenant := request.GetTenant(r) metadata := map[string]string{"tenant": tenant} return context.WithValue(context.Background(), share.ReqMetaDataKey, metadata) } // 解析token,若无,返回空 func getRequestToken(r *ghttp.Request) string { authHeader := r.Header.Get("Authorization") if authHeader != "" { parts := strings.SplitN(authHeader, " ", 2) if !(len(parts) == 2 && parts[0] == "Bearer") { return "" } else if parts[1] == "" { return "" } return parts[1] } return "" }