package micro_srv import ( "context" "encoding/json" "errors" "io/ioutil" "net" "os" "path" "strconv" "strings" "time" "dashoo.cn/common_definition/comm_def" "dashoo.cn/opms_libary/dynamic" "dashoo.cn/opms_libary/gtoken" "dashoo.cn/opms_libary/multipart" "dashoo.cn/opms_libary/myerrors" "dashoo.cn/opms_libary/request" "github.com/gogf/gf/encoding/gbase64" "github.com/gogf/gf/encoding/gjson" "github.com/gogf/gf/errors/gerror" "github.com/gogf/gf/frame/g" "github.com/gogf/gf/net/ghttp" "github.com/gogf/gf/text/gstr" "github.com/gogf/gf/util/gconv" "github.com/rcrowley/go-metrics" consulclient "github.com/rpcxio/rpcx-consul/client" "github.com/rpcxio/rpcx-consul/serverplugin" "github.com/smallnest/rpcx/client" "github.com/smallnest/rpcx/protocol" "github.com/smallnest/rpcx/server" "github.com/smallnest/rpcx/share" ) // InitMicroSrvClient 获取微服务客户端,arg为可选参数,若有必须是两个,分别是:reg string, serverAddr string func InitMicroSrvClient(serviceName, key string, args ...string) (c client.XClient) { reg := g.Config().GetString("service_registry.registry") etcdAddr := g.Config().GetString("service_registry.server-addr") if len(args) == 2 { reg = args[0] etcdAddr = args[1] } config := g.Config().GetString(key) arr := strings.Split(config, ",") srvName := arr[0] if len(arr) == 2 { // 点对点 直连 d, _ := client.NewPeer2PeerDiscovery("tcp@"+arr[1], "") c = client.NewXClient(serviceName, client.Failtry, client.RandomSelect, d, client.DefaultOption) return c } else { if reg == "consul" { // 服务发现使用consul //d, _ := etcd_client.NewEtcdV3Discovery(srvName, serviceName, []string{etcdAddr}, nil) d, _ := consulclient.NewConsulDiscovery(srvName, serviceName, []string{etcdAddr}, nil) //d, _ := client.NewConsulDiscovery(srvName, serviceName, []string{etcdAddr}, nil) c = client.NewXClient(serviceName, client.Failover, client.RoundRobin, d, client.DefaultOption) return c } } return nil } func CreateAndInitService(basePath string) *server.Server { srvAddr := g.Config().GetString("setting.bind-addr") fileAddr := g.Config().GetString("setting.bind-mutipart-addr") etcdAddr := g.Config().GetString("service_registry.server-addr") s := server.NewServer() if fileAddr != "" { p := server.NewStreamService(fileAddr, streamHandler, nil, 1000) s.EnableStreamService(share.StreamServiceName, p) } advertiseAddr := srvAddr if g.Config().GetBool("setting.need-advertise-addr") { advertiseAddr = g.Config().GetString("setting.advertise-addr") } g.Log().Infof("服务启动, basePath: %v, MicorSrv: %v", basePath, srvAddr) reg := g.Config().GetString("service_registry.registry") //if reg == "etcd" { // addEtcdRegistryPlugin(s, basePath, advertiseAddr, etcdAddr) //} if reg == "consul" { addConsulRegistryPlugin(s, basePath, advertiseAddr, etcdAddr) } return s } func addConsulRegistryPlugin(s *server.Server, basePath, srvAddr, consulAddr string) { r := &serverplugin.ConsulRegisterPlugin{ ServiceAddress: "tcp@" + srvAddr, ConsulServers: []string{consulAddr}, BasePath: basePath, Metrics: metrics.NewRegistry(), UpdateInterval: time.Minute, } err := r.Start() if err != nil { g.Log().Fatal(err) } g.Log().Infof("注册到Consul: %v, basePath: %v, MicorSrv: %v", consulAddr, basePath, srvAddr) s.Plugins.Add(r) } func streamHandler(conn net.Conn, args *share.StreamServiceArgs) { defer conn.Close() ctx := context.Background() ctx = context.WithValue(ctx, share.ReqMetaDataKey, args.Meta) token, err := GetToken(ctx) if err != nil { result, _ := handError(err) conn.Write(result) return } resp := validToken(ctx, token) if resp.Code != 0 { result, _ := handError(myerrors.AuthError()) conn.Write(result) return } args.Meta["userInfo"] = resp.DataString() ctx = context.WithValue(ctx, share.ReqMetaDataKey, args.Meta) fileName := args.Meta["fileName"] //获取文件后缀 suffix := path.Ext(fileName) tmpFile, err := ioutil.TempFile(os.TempDir(), "multipart-*"+suffix) if err != nil { g.Log().Error(err) return } defer os.Remove(tmpFile.Name()) fileSize := args.Meta["fileSize"] size, _ := strconv.Atoi(fileSize) total := 0 buf := make([]byte, 4096) for { n, _ := conn.Read(buf) total += n _, err = tmpFile.Write(buf) if err != nil { g.Log().Error(err) return } //如果实际总接受字节数与客户端给的要传输字节数相等,说明传输完毕 if total == size { result := make([]byte, 0) className, _ := args.Meta["reqService"] methodName, _ := args.Meta["reqMethod"] message := new(dynamic.Message) message.ClassName = className message.MethodName = methodName message.Payload = &multipart.MultipartFile{FileName: fileName, FileSize: gconv.Int64(fileSize), File: tmpFile, Meta: args.Meta} rsp, err := dynamic.Invoker.HandleInvoker(ctx, message) if err != nil { resp := make(map[string]interface{}) resp["code"] = 500 resp["data"] = err.Error() result, _ = json.Marshal(resp) } else { result, _ = json.Marshal(rsp.Payload) } conn.Write(result) break } } conn.Close() } func getTenant(msg *protocol.Message) string { var tenant string if msg.Metadata != nil { tenant = msg.Metadata["tenant"] } return tenant } // HandleAuth 处理Auth认证 func HandleAuth(ctx context.Context, req *protocol.Message, token string, authExcludePaths []string) error { reqPath := "/" + req.ServicePath + "/" + req.ServiceMethod tenant := getTenant(req) g.Log().Info("Received " + reqPath + " request @ " + tenant) if authPath(reqPath, authExcludePaths) { req.Metadata["authExclude"] = "false" var rsp gtoken.Resp notAuthSrv := ctx.Value("NotAuthSrv") if notAuthSrv != nil && notAuthSrv.(bool) { rsp = gtoken.GFToken.ValidToken(token) } else { ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{"tenant": tenant}) rsp = validToken(ctx, token) } if rsp.Code != 0 && rsp.Code != 200 { return myerrors.AuthError() } if req.Metadata != nil { req.Metadata["userInfo"] = rsp.DataString() } return nil } return nil } // 判断路径是否需要进行认证拦截 // return true 需要认证 func authPath(urlPath string, authExcludePaths []string) bool { // 去除后斜杠 if strings.HasSuffix(urlPath, "/") { urlPath = gstr.SubStr(urlPath, 0, len(urlPath)-1) } // 排除路径处理,到这里nextFlag为true for _, excludePath := range authExcludePaths { tmpPath := excludePath // 前缀匹配 if strings.HasSuffix(tmpPath, "/*") { tmpPath = gstr.SubStr(tmpPath, 0, len(tmpPath)-2) if gstr.HasPrefix(urlPath, tmpPath) { // 前缀匹配不拦截 return false } } else { // 全路径匹配 if strings.HasSuffix(tmpPath, "/") { tmpPath = gstr.SubStr(tmpPath, 0, len(tmpPath)-1) } if urlPath == tmpPath { // 全路径匹配不拦截 return false } } } return true } // 验证token func validToken(ctx context.Context, token string) gtoken.Resp { grsp := gtoken.Resp{} if token == "" { grsp.Code = 401 grsp.Msg = "valid token empty" return grsp } authService := InitMicroSrvClient("Auth", "micro_srv.auth") defer authService.Close() rsp := &comm_def.CommonMsg{} err := authService.Call(ctx, "ValidToken", token, rsp) if err != nil { g.Log().Error(err) grsp.Code = 401 return grsp } grsp.Code = int(rsp.Code) grsp.Msg = rsp.Msg grsp.Data = rsp.Data return grsp } // IsAuthExclude 是否进行auth验证 func IsAuthExclude(ctx context.Context) bool { reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string) flag, ok := reqMeta["authExclude"] if !ok || flag == "true" { return true } return false } // GetUserInfo 从context中获取UserInfo func GetUserInfo(ctx context.Context) (request.UserInfo, error) { reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string) userStr, ok := reqMeta["userInfo"] if !ok { return request.UserInfo{}, errors.New("用户信息获取失败,请重新登录。") } userInfo, err := getUserInfoDataString(userStr) if err != nil { return request.UserInfo{}, errors.New("用户信息解码失败。") } return userInfo, nil } // GetTenant 从context中获取租户码 func GetTenant(ctx context.Context) (string, error) { reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string) tenant, ok := reqMeta["tenant"] if !ok { return "", errors.New("不存在租户码") } return tenant, nil } // GetReqMethod 从context中获取请求方式 func GetReqMethod(ctx context.Context) (string, error) { reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string) reqMethod, ok := reqMeta["reqMethod"] if !ok { return "", errors.New("获取请求方式异常") } return reqMethod, nil } // GetToken 从context中获取Token func GetToken(ctx context.Context) (string, error) { reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string) token, ok := reqMeta["__AUTH"] if !ok { return "", errors.New("token获取失败") } return token, nil } // GetBrowserInfo 从context中获取ClientIP和UserAgent func GetBrowserInfo(ctx context.Context) (clientIP string, userAgent string, err error) { reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string) clientIP, ok := reqMeta["clientIP"] if !ok { return "", "", errors.New("BrowserInfo获取失败") } userAgent, ok = reqMeta["userAgent"] if !ok { return "", "", errors.New("BrowserInfo获取失败") } userAgent, err = gbase64.DecodeToString(userAgent) return } // getUserInfoDataString 从userInfo字符串转换成对象 func getUserInfoDataString(userInfoString string) (request.UserInfo, error) { var userInfo request.UserInfo //uuid := "" if j, err := gjson.DecodeToJson([]byte(userInfoString)); err != nil { g.Log().Error(err) return userInfo, err } else { j.SetViolenceCheck(true) err = j.GetStruct("data", &userInfo) if err != nil { g.Log().Error(err) return userInfo, err } } return userInfo, nil } // SetTenant 设置租户码(传统WebAPI调用使用) func SetTenant(tenant string) context.Context { metadata := map[string]string{"tenant": tenant} return context.WithValue(context.Background(), share.ReqMetaDataKey, metadata) } // SetTenantAndAuth 设置租户码和认证信息(传统WebAPI调用使用) func SetTenantAndAuth(r *ghttp.Request, client client.XClient) context.Context { // 处理Auth token := getRequestToken(r) if token != "" { client.Auth(token) } // 处理租户码 tenant := request.GetTenant(r) metadata := map[string]string{"tenant": tenant} return context.WithValue(context.Background(), share.ReqMetaDataKey, metadata) } // 解析token,若无,返回空 func getRequestToken(r *ghttp.Request) string { authHeader := r.Header.Get("Authorization") if authHeader != "" { parts := strings.SplitN(authHeader, " ", 2) if !(len(parts) == 2 && parts[0] == "Bearer") { return "" } else if parts[1] == "" { return "" } return parts[1] } return "" } func handError(err error) ([]byte, error) { resp := make(map[string]interface{}) resp["code"] = gerror.Code(err).Code() resp["data"] = gerror.Code(err).Message() return json.Marshal(resp) }