package micro_srv import ( "context" "dashoo.cn/opms_libary/dynamic" "dashoo.cn/opms_libary/multipart" "encoding/json" "errors" "fmt" "github.com/gogf/gf/util/gconv" "io/ioutil" "net" "os" "path" "strconv" "strings" "time" "dashoo.cn/common_definition/comm_def" "dashoo.cn/opms_libary/gtoken" "dashoo.cn/opms_libary/myerrors" "dashoo.cn/opms_libary/request" "github.com/gogf/gf/encoding/gbase64" "github.com/gogf/gf/encoding/gjson" "github.com/gogf/gf/errors/gerror" "github.com/gogf/gf/frame/g" "github.com/gogf/gf/net/ghttp" "github.com/gogf/gf/text/gstr" "github.com/rcrowley/go-metrics" consulclient "github.com/rpcxio/rpcx-consul/client" "github.com/rpcxio/rpcx-consul/serverplugin" "github.com/smallnest/rpcx/client" "github.com/smallnest/rpcx/protocol" "github.com/smallnest/rpcx/server" "github.com/smallnest/rpcx/share" ) // InitMicroSrvClient 获取微服务客户端,arg为可选参数,若有必须是两个,分别是:reg string, serverAddr string func InitMicroSrvClient(serviceName, key string, args ...string) (c client.XClient) { reg := g.Config().GetString("service_registry.registry") etcdAddr := g.Config().GetString("service_registry.server-addr") if len(args) == 2 { reg = args[0] etcdAddr = args[1] } config := g.Config().GetString(key) arr := strings.Split(config, ",") srvName := arr[0] if len(arr) == 2 { // 点对点 直连 d, _ := client.NewPeer2PeerDiscovery("tcp@"+arr[1], "") c = client.NewXClient(serviceName, client.Failtry, client.RandomSelect, d, client.DefaultOption) return c } else { if reg == "consul" { // 服务发现使用consul //d, _ := etcd_client.NewEtcdV3Discovery(srvName, serviceName, []string{etcdAddr}, nil) d, _ := consulclient.NewConsulDiscovery(srvName, serviceName, []string{etcdAddr}, nil) //d, _ := client.NewConsulDiscovery(srvName, serviceName, []string{etcdAddr}, nil) c = client.NewXClient(serviceName, client.Failover, client.RoundRobin, d, client.DefaultOption) return c } } return nil } func CreateAndInitService(basePath string) *server.Server { srvAddr := g.Config().GetString("setting.bind-addr") fileAddr := g.Config().GetString("setting.bind-mutipart-addr") etcdAddr := g.Config().GetString("service_registry.server-addr") s := server.NewServer() advertiseAddr := srvAddr if g.Config().GetBool("setting.need-advertise-addr") { advertiseAddr = g.Config().GetString("setting.advertise-addr") } g.Log().Infof("服务启动, basePath: %v, MicorSrv: %v", basePath, srvAddr) reg := g.Config().GetString("service_registry.registry") //if reg == "etcd" { // addEtcdRegistryPlugin(s, basePath, advertiseAddr, etcdAddr) //} if reg == "consul" { addConsulRegistryPlugin(s, basePath, advertiseAddr, etcdAddr) } if fileAddr != "" { p := server.NewStreamService(fileAddr, streamHandler, nil, 1000) s.EnableStreamService(share.StreamServiceName, p) } return s } func addConsulRegistryPlugin(s *server.Server, basePath, srvAddr, consulAddr string) { r := &serverplugin.ConsulRegisterPlugin{ ServiceAddress: "tcp@" + srvAddr, ConsulServers: []string{consulAddr}, BasePath: basePath, Metrics: metrics.NewRegistry(), UpdateInterval: time.Minute, } err := r.Start() if err != nil { g.Log().Fatal(err) } g.Log().Infof("注册到Consul: %v, basePath: %v, MicorSrv: %v", consulAddr, basePath, srvAddr) s.Plugins.Add(r) } func streamHandler(conn net.Conn, args *share.StreamServiceArgs) { defer conn.Close() ctx := context.Background() ctx = context.WithValue(ctx, share.ReqMetaDataKey, args.Meta) token, err := GetToken(ctx) if err != nil { result, _ := handError(err) conn.Write(result) return } resp := validToken(ctx, token) if resp.Code != 0 { result, _ := handError(myerrors.AuthError()) conn.Write(result) return } args.Meta["userInfo"] = resp.DataString() ctx = context.WithValue(ctx, share.ReqMetaDataKey, args.Meta) var form = new(multipart.Form) form.Value = map[string]string{} form.File = map[string]*multipart.FileHeader{} for key, value := range args.Meta { form.Value[key] = value } fileNum := gconv.Int(args.Meta["fileNum"]) if fileNum > 0 { for { // 读取文件名长度 buf := make([]byte, 3) conn.Read(buf) length, _ := strconv.Atoi(strings.TrimSpace(string(buf))) // 读取文件名 fileHeader := make([]byte, length) _, err := conn.Read(fileHeader) headers := strings.Split(string(fileHeader), " ") if len(headers) != 3 { fmt.Println(form, "111111") break } paramName := headers[0] fileName := headers[1] fileSize := headers[2] //获取文件后缀 suffix := path.Ext(fileName) tmpFile, err := ioutil.TempFile(os.TempDir(), "multipart-*"+suffix) if err != nil { g.Log().Error(err) return } defer os.Remove(tmpFile.Name()) size, _ := strconv.Atoi(fileSize) curSize := 0 for { if curSize+1024 > size { buf = make([]byte, size-curSize) curSize = size } else { buf = make([]byte, 1024) curSize = curSize + 1024 } conn.Read(buf) tmpFile.Write(buf) if curSize == size { break } } fmt.Println("1==============================") form.File[paramName] = &multipart.FileHeader{FileName: fileName, FileSize: int64(size), File: tmpFile} fmt.Println(form) // 判断是否结束 isEnd := make([]byte, 1) conn.Read(isEnd) fmt.Println(gconv.String(isEnd)) if gconv.String(isEnd) == "1" { fmt.Println("2==============================") continue } if gconv.String(isEnd) == "2" { fmt.Println("3==============================") break } fmt.Println("4==============================") } fmt.Println("5==============================") } fmt.Println("6==============================") result := make([]byte, 0) className, _ := args.Meta["reqService"] methodName, _ := args.Meta["reqMethod"] message := new(dynamic.Message) message.ClassName = className message.MethodName = methodName message.Metadata = args.Meta message.Payload = form rsp, err := dynamic.Invoker.HandleInvoker(ctx, message) if err != nil { resp := make(map[string]interface{}) resp["code"] = 500 resp["data"] = err.Error() result, _ = json.Marshal(resp) } else { result, _ = json.Marshal(rsp.Payload) } conn.Write(result) conn.Close() } func getTenant(msg *protocol.Message) string { var tenant string if msg.Metadata != nil { tenant = msg.Metadata["tenant"] } return tenant } // HandleAuth 处理Auth认证 func HandleAuth(ctx context.Context, req *protocol.Message, token string, authExcludePaths []string) error { reqPath := "/" + req.ServicePath + "/" + req.ServiceMethod tenant := getTenant(req) g.Log().Info("Received " + reqPath + " request @ " + tenant) if authPath(reqPath, authExcludePaths) { req.Metadata["authExclude"] = "false" var rsp gtoken.Resp notAuthSrv := ctx.Value("NotAuthSrv") if notAuthSrv != nil && notAuthSrv.(bool) { rsp = gtoken.GFToken.ValidToken(token) } else { ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{"tenant": tenant}) rsp = validToken(ctx, token) } if rsp.Code != 0 && rsp.Code != 200 { return myerrors.AuthError() } if req.Metadata != nil { req.Metadata["userInfo"] = rsp.DataString() } return nil } return nil } // 判断路径是否需要进行认证拦截 // return true 需要认证 func authPath(urlPath string, authExcludePaths []string) bool { // 去除后斜杠 if strings.HasSuffix(urlPath, "/") { urlPath = gstr.SubStr(urlPath, 0, len(urlPath)-1) } // 排除路径处理,到这里nextFlag为true for _, excludePath := range authExcludePaths { tmpPath := excludePath // 前缀匹配 if strings.HasSuffix(tmpPath, "/*") { tmpPath = gstr.SubStr(tmpPath, 0, len(tmpPath)-2) if gstr.HasPrefix(urlPath, tmpPath) { // 前缀匹配不拦截 return false } } else { // 全路径匹配 if strings.HasSuffix(tmpPath, "/") { tmpPath = gstr.SubStr(tmpPath, 0, len(tmpPath)-1) } if urlPath == tmpPath { // 全路径匹配不拦截 return false } } } return true } // 验证token func validToken(ctx context.Context, token string) gtoken.Resp { grsp := gtoken.Resp{} if token == "" { grsp.Code = 401 grsp.Msg = "valid token empty" return grsp } authService := InitMicroSrvClient("Auth", "micro_srv.auth") defer authService.Close() rsp := &comm_def.CommonMsg{} err := authService.Call(ctx, "ValidToken", token, rsp) if err != nil { g.Log().Error(err) grsp.Code = 401 return grsp } grsp.Code = int(rsp.Code) grsp.Msg = rsp.Msg grsp.Data = rsp.Data return grsp } // IsAuthExclude 是否进行auth验证 func IsAuthExclude(ctx context.Context) bool { reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string) flag, ok := reqMeta["authExclude"] if !ok || flag == "true" { return true } return false } // GetUserInfo 从context中获取UserInfo func GetUserInfo(ctx context.Context) (request.UserInfo, error) { reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string) userStr, ok := reqMeta["userInfo"] if !ok { return request.UserInfo{}, errors.New("用户信息获取失败,请重新登录。") } userInfo, err := getUserInfoDataString(userStr) if err != nil { return request.UserInfo{}, errors.New("用户信息解码失败。") } return userInfo, nil } // GetTenant 从context中获取租户码 func GetTenant(ctx context.Context) (string, error) { reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string) tenant, ok := reqMeta["tenant"] if !ok { return "", errors.New("不存在租户码") } return tenant, nil } // GetReqMethod 从context中获取请求方式 func GetReqMethod(ctx context.Context) (string, error) { reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string) reqMethod, ok := reqMeta["reqMethod"] if !ok { return "", errors.New("获取请求方式异常") } return reqMethod, nil } // GetToken 从context中获取Token func GetToken(ctx context.Context) (string, error) { reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string) token, ok := reqMeta["__AUTH"] if !ok { return "", errors.New("token获取失败") } return token, nil } // GetBrowserInfo 从context中获取ClientIP和UserAgent func GetBrowserInfo(ctx context.Context) (clientIP string, userAgent string, err error) { reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string) clientIP, ok := reqMeta["clientIP"] if !ok { return "", "", errors.New("BrowserInfo获取失败") } userAgent, ok = reqMeta["userAgent"] if !ok { return "", "", errors.New("BrowserInfo获取失败") } userAgent, err = gbase64.DecodeToString(userAgent) return } // getUserInfoDataString 从userInfo字符串转换成对象 func getUserInfoDataString(userInfoString string) (request.UserInfo, error) { var userInfo request.UserInfo //uuid := "" if j, err := gjson.DecodeToJson([]byte(userInfoString)); err != nil { g.Log().Error(err) return userInfo, err } else { j.SetViolenceCheck(true) err = j.GetStruct("data", &userInfo) if err != nil { g.Log().Error(err) return userInfo, err } } return userInfo, nil } // SetTenant 设置租户码(传统WebAPI调用使用) func SetTenant(tenant string) context.Context { metadata := map[string]string{"tenant": tenant} return context.WithValue(context.Background(), share.ReqMetaDataKey, metadata) } // SetTenantAndAuth 设置租户码和认证信息(传统WebAPI调用使用) func SetTenantAndAuth(r *ghttp.Request, client client.XClient) context.Context { // 处理Auth token := getRequestToken(r) if token != "" { client.Auth(token) } // 处理租户码 tenant := request.GetTenant(r) metadata := map[string]string{"tenant": tenant} return context.WithValue(context.Background(), share.ReqMetaDataKey, metadata) } // 解析token,若无,返回空 func getRequestToken(r *ghttp.Request) string { authHeader := r.Header.Get("Authorization") if authHeader != "" { parts := strings.SplitN(authHeader, " ", 2) if !(len(parts) == 2 && parts[0] == "Bearer") { return "" } else if parts[1] == "" { return "" } return parts[1] } return "" } func handError(err error) ([]byte, error) { resp := make(map[string]interface{}) resp["code"] = gerror.Code(err).Code() resp["data"] = gerror.Code(err).Message() return json.Marshal(resp) }