| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407 |
- package micro_srv
- import (
- "context"
- "dashoo.cn/opms_libary/dynamic"
- "dashoo.cn/opms_libary/gtoken"
- "dashoo.cn/opms_libary/mutipart"
- "dashoo.cn/opms_libary/myerrors"
- "dashoo.cn/opms_libary/request"
- "encoding/json"
- "errors"
- "github.com/gogf/gf/errors/gerror"
- "github.com/gogf/gf/util/gconv"
- "github.com/rcrowley/go-metrics"
- "io/ioutil"
- "net"
- "os"
- "path"
- "strconv"
- "strings"
- "time"
- "github.com/gogf/gf/encoding/gbase64"
- "github.com/gogf/gf/encoding/gjson"
- "github.com/gogf/gf/frame/g"
- "github.com/gogf/gf/net/ghttp"
- "github.com/gogf/gf/text/gstr"
- "github.com/smallnest/rpcx/client"
- "github.com/smallnest/rpcx/protocol"
- "github.com/smallnest/rpcx/server"
- "github.com/smallnest/rpcx/serverplugin"
- "github.com/smallnest/rpcx/share"
- "dashoo.cn/common_definition/auth"
- )
- // InitMicroSrvClient 获取微服务客户端,arg为可选参数,若有必须是两个,分别是:reg string, serverAddr string
- func InitMicroSrvClient(serviceName, key string, args ...string) (c client.XClient) {
- reg := g.Config().GetString("service_registry.registry")
- etcdAddr := g.Config().GetString("service_registry.server-addr")
- if len(args) == 2 {
- reg = args[0]
- etcdAddr = args[1]
- }
- config := g.Config().GetString(key)
- arr := strings.Split(config, ",")
- srvName := arr[0]
- if len(arr) == 2 { // 点对点 直连
- d, _ := client.NewPeer2PeerDiscovery("tcp@"+arr[1], "")
- c = client.NewXClient(serviceName, client.Failtry, client.RandomSelect, d, client.DefaultOption)
- return c
- } else {
- if reg == "consul" { // 服务发现使用consul
- //d, _ := etcd_client.NewEtcdV3Discovery(srvName, serviceName, []string{etcdAddr}, nil)
- d, _ := client.NewConsulDiscovery(srvName, serviceName, []string{etcdAddr}, nil)
- c = client.NewXClient(serviceName, client.Failover, client.RoundRobin, d, client.DefaultOption)
- return c
- }
- }
- return nil
- }
- func CreateAndInitService(basePath string) *server.Server {
- srvAddr := g.Config().GetString("setting.bind-addr")
- fileAddr := g.Config().GetString("setting.bind-mutipart-addr")
- etcdAddr := g.Config().GetString("service_registry.server-addr")
- s := server.NewServer()
- if fileAddr != "" {
- p := server.NewStreamService(fileAddr, streamHandler, nil, 1000)
- s.EnableStreamService(share.StreamServiceName, p)
- }
- advertiseAddr := srvAddr
- if g.Config().GetBool("setting.need-advertise-addr") {
- advertiseAddr = g.Config().GetString("setting.advertise-addr")
- }
- g.Log().Infof("服务启动, basePath: %v, MicorSrv: %v", basePath, srvAddr)
- reg := g.Config().GetString("service_registry.registry")
- //if reg == "etcd" {
- // addEtcdRegistryPlugin(s, basePath, advertiseAddr, etcdAddr)
- //}
- if reg == "consul" {
- addConsulRegistryPlugin(s, basePath, advertiseAddr, etcdAddr)
- }
- return s
- }
- func addConsulRegistryPlugin(s *server.Server, basePath, srvAddr, consulAddr string) {
- r := &serverplugin.ConsulRegisterPlugin{
- ServiceAddress: "tcp@" + srvAddr,
- ConsulServers: []string{consulAddr},
- BasePath: basePath,
- Metrics: metrics.NewRegistry(),
- UpdateInterval: time.Minute,
- }
- err := r.Start()
- if err != nil {
- g.Log().Fatal(err)
- }
- g.Log().Infof("注册到Consul: %v, basePath: %v, MicorSrv: %v", consulAddr, basePath, srvAddr)
- s.Plugins.Add(r)
- }
- func streamHandler(conn net.Conn, args *share.StreamServiceArgs) {
- defer conn.Close()
- ctx := context.Background()
- ctx = context.WithValue(ctx, share.ReqMetaDataKey, args.Meta)
- token, err := GetToken(ctx)
- if err != nil {
- result, _ := handError(err)
- conn.Write(result)
- return
- }
- resp := validToken(token)
- if resp.Code != 0 {
- result, _ := handError(myerrors.AuthError())
- conn.Write(result)
- return
- }
- args.Meta["userInfo"] = resp.DataString()
- ctx = context.WithValue(ctx, share.ReqMetaDataKey, args.Meta)
- fileName := args.Meta["fileName"]
- //获取文件后缀
- suffix := path.Ext(fileName)
- tmpFile, err := ioutil.TempFile(os.TempDir(), "multipart-*"+suffix)
- if err != nil {
- g.Log().Error(err)
- return
- }
- defer os.Remove(tmpFile.Name())
- fileSize := args.Meta["fileSize"]
- size, _ := strconv.Atoi(fileSize)
- total := 0
- buf := make([]byte, 4096)
- for {
- n, _ := conn.Read(buf)
- total += n
- _, err = tmpFile.Write(buf)
- if err != nil {
- g.Log().Error(err)
- return
- }
- //如果实际总接受字节数与客户端给的要传输字节数相等,说明传输完毕
- if total == size {
- result := make([]byte, 0)
- className, _ := args.Meta["reqService"]
- methodName, _ := args.Meta["reqMethod"]
- message := new(dynamic.Message)
- message.ClassName = className
- message.MethodName = methodName
- message.Payload = &mutipart.MultipartFile{FileName: fileName, FileSize: gconv.Int64(fileSize), File: tmpFile}
- rsp, err := dynamic.Invoker.HandleInvoker(ctx, message)
- if err != nil {
- resp := make(map[string]interface{})
- resp["code"] = 500
- resp["data"] = err.Error()
- result, _ = json.Marshal(resp)
- } else {
- result, _ = json.Marshal(rsp.Payload)
- }
- conn.Write(result)
- break
- }
- }
- conn.Close()
- }
- // HandleAuth 处理Auth认证
- func HandleAuth(ctx context.Context, req *protocol.Message, token string, authExcludePaths []string) error {
- path := "/" + req.ServicePath + "/" + req.ServiceMethod
- //g.Log().Info("reqPath: ", path)
- //g.Log().Info("token: ", token)
- if authPath(path, authExcludePaths) {
- req.Metadata["authExclude"] = "false"
- var rsp gtoken.Resp
- notAuthSrv := ctx.Value("NotAuthSrv")
- if notAuthSrv != nil && notAuthSrv.(bool) {
- rsp = gtoken.GFToken.ValidToken(token)
- } else {
- rsp = validToken(token)
- }
- //return errors.New("InvalidToken")
- if rsp.Code != 0 {
- return myerrors.AuthError()
- }
- //userInfo, err := getUserInfoFromToken(rsp)
- //if err!=nil{
- // return err
- //}
- //g.Dump(userInfo)
- if req.Metadata != nil {
- req.Metadata["userInfo"] = rsp.DataString()
- }
- return nil
- }
- return nil
- }
- // 判断路径是否需要进行认证拦截
- // return true 需要认证
- func authPath(urlPath string, authExcludePaths []string) bool {
- // 去除后斜杠
- if strings.HasSuffix(urlPath, "/") {
- urlPath = gstr.SubStr(urlPath, 0, len(urlPath)-1)
- }
- // 排除路径处理,到这里nextFlag为true
- for _, excludePath := range authExcludePaths {
- tmpPath := excludePath
- // 前缀匹配
- if strings.HasSuffix(tmpPath, "/*") {
- tmpPath = gstr.SubStr(tmpPath, 0, len(tmpPath)-2)
- if gstr.HasPrefix(urlPath, tmpPath) {
- // 前缀匹配不拦截
- return false
- }
- } else {
- // 全路径匹配
- if strings.HasSuffix(tmpPath, "/") {
- tmpPath = gstr.SubStr(tmpPath, 0, len(tmpPath)-1)
- }
- if urlPath == tmpPath {
- // 全路径匹配不拦截
- return false
- }
- }
- }
- return true
- }
- // 验证token
- func validToken(token string) gtoken.Resp {
- grsp := gtoken.Resp{}
- if token == "" {
- grsp.Code = 401
- grsp.Msg = "valid token empty"
- return grsp
- }
- authService := InitMicroSrvClient("Auth", "micro_srv.auth")
- defer authService.Close()
- rsp := &auth.Response{}
- err := authService.Call(context.TODO(), "ValidToken", token, rsp)
- if err != nil {
- g.Log().Error(err)
- grsp.Code = 401
- return grsp
- }
- grsp.Code = int(rsp.Code)
- grsp.Msg = rsp.Msg
- grsp.Data = rsp.Data
- return grsp
- }
- // IsAuthExclude 是否进行auth验证
- func IsAuthExclude(ctx context.Context) bool {
- reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
- flag, ok := reqMeta["authExclude"]
- if !ok || flag == "true" {
- return true
- }
- return false
- }
- // GetUserInfo 从context中获取UserInfo
- func GetUserInfo(ctx context.Context) (request.UserInfo, error) {
- reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
- userStr, ok := reqMeta["userInfo"]
- if !ok {
- return request.UserInfo{}, errors.New("用户信息获取失败,请重新登录。")
- }
- userInfo, err := getUserInfoDataString(userStr)
- if err != nil {
- return request.UserInfo{}, errors.New("用户信息解码失败。")
- }
- return userInfo, nil
- }
- // GetTenant 从context中获取租户码
- func GetTenant(ctx context.Context) (string, error) {
- reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
- tenant, ok := reqMeta["tenant"]
- if !ok {
- return "", errors.New("不存在租户码")
- }
- return tenant, nil
- }
- // GetReqMethod 从context中获取请求方式
- func GetReqMethod(ctx context.Context) (string, error) {
- reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
- reqMethod, ok := reqMeta["reqMethod"]
- if !ok {
- return "", errors.New("获取请求方式异常")
- }
- return reqMethod, nil
- }
- // GetToken 从context中获取Token
- func GetToken(ctx context.Context) (string, error) {
- reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
- token, ok := reqMeta["__AUTH"]
- if !ok {
- return "", errors.New("token获取失败")
- }
- return token, nil
- }
- // GetBrowserInfo 从context中获取ClientIP和UserAgent
- func GetBrowserInfo(ctx context.Context) (clientIP string, userAgent string, err error) {
- reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
- clientIP, ok := reqMeta["clientIP"]
- if !ok {
- return "", "", errors.New("BrowserInfo获取失败")
- }
- userAgent, ok = reqMeta["userAgent"]
- if !ok {
- return "", "", errors.New("BrowserInfo获取失败")
- }
- userAgent, err = gbase64.DecodeToString(userAgent)
- return
- }
- // getUserInfoDataString 从userInfo字符串转换成对象
- func getUserInfoDataString(userInfoString string) (request.UserInfo, error) {
- var userInfo request.UserInfo
- //uuid := ""
- if j, err := gjson.DecodeToJson([]byte(userInfoString)); err != nil {
- g.Log().Error(err)
- return userInfo, err
- } else {
- j.SetViolenceCheck(true)
- err = j.GetStruct("data", &userInfo)
- if err != nil {
- g.Log().Error(err)
- return userInfo, err
- }
- }
- return userInfo, nil
- }
- // SetTenant 设置租户码(传统WebAPI调用使用)
- func SetTenant(tenant string) context.Context {
- metadata := map[string]string{"tenant": tenant}
- return context.WithValue(context.Background(), share.ReqMetaDataKey, metadata)
- }
- // SetTenantAndAuth 设置租户码和认证信息(传统WebAPI调用使用)
- func SetTenantAndAuth(r *ghttp.Request, client client.XClient) context.Context {
- // 处理Auth
- token := getRequestToken(r)
- if token != "" {
- client.Auth(token)
- }
- // 处理租户码
- tenant := request.GetTenant(r)
- metadata := map[string]string{"tenant": tenant}
- return context.WithValue(context.Background(), share.ReqMetaDataKey, metadata)
- }
- // 解析token,若无,返回空
- func getRequestToken(r *ghttp.Request) string {
- authHeader := r.Header.Get("Authorization")
- if authHeader != "" {
- parts := strings.SplitN(authHeader, " ", 2)
- if !(len(parts) == 2 && parts[0] == "Bearer") {
- return ""
- } else if parts[1] == "" {
- return ""
- }
- return parts[1]
- }
- return ""
- }
- func handError(err error) ([]byte, error) {
- resp := make(map[string]interface{})
- resp["code"] = gerror.Code(err).Code()
- resp["data"] = gerror.Code(err).Message()
- return json.Marshal(resp)
- }
|