||
- package micro_srv
- import (
- "context"
- "dashoo.cn/opms_libary/dynamic"
- "dashoo.cn/opms_libary/mutipart"
- "dashoo.cn/opms_libary/request"
- "encoding/json"
- "errors"
- "fmt"
- "github.com/rcrowley/go-metrics"
- "io/ioutil"
- "net"
- "os"
- "path"
- "reflect"
- "strconv"
- "strings"
- "time"
- "github.com/gogf/gf/encoding/gbase64"
- "github.com/gogf/gf/encoding/gjson"
- "github.com/gogf/gf/frame/g"
- "github.com/gogf/gf/net/ghttp"
- "github.com/gogf/gf/text/gstr"
- "github.com/smallnest/rpcx/client"
- "github.com/smallnest/rpcx/protocol"
- "github.com/smallnest/rpcx/server"
- "github.com/smallnest/rpcx/serverplugin"
- "github.com/smallnest/rpcx/share"
- "dashoo.cn/common_definition/auth"
- )
- var (
- fileTransfer = reflect.TypeOf((*mutipart.FileTransfer)(nil)).Elem()
- )
- // InitMicroSrvClient 获取微服务客户端,arg为可选参数,若有必须是两个,分别是:reg string, serverAddr string
- func InitMicroSrvClient(serviceName, key string, args ...string) (c client.XClient) {
- reg := g.Config().GetString("service_registry.registry")
- etcdAddr := g.Config().GetString("service_registry.server-addr")
- if len(args) == 2 {
- reg = args[0]
- etcdAddr = args[1]
- }
- config := g.Config().GetString(key)
- arr := strings.Split(config, ",")
- srvName := arr[0]
- if len(arr) == 2 { // 点对点 直连
- d, _ := client.NewPeer2PeerDiscovery("tcp@"+arr[1], "")
- c = client.NewXClient(serviceName, client.Failtry, client.RandomSelect, d, client.DefaultOption)
- return c
- } else {
- if reg == "consul" { // 服务发现使用consul
- //d, _ := etcd_client.NewEtcdV3Discovery(srvName, serviceName, []string{etcdAddr}, nil)
- d, _ := client.NewConsulDiscovery(srvName, serviceName, []string{etcdAddr}, nil)
- c = client.NewXClient(serviceName, client.Failover, client.RoundRobin, d, client.DefaultOption)
- return c
- }
- }
- return nil
- }
- func CreateAndInitService(basePath string) *server.Server {
- srvAddr := g.Config().GetString("setting.bind-addr")
- fileAddr := g.Config().GetString("setting.bind-mutipart-addr")
- etcdAddr := g.Config().GetString("service_registry.server-addr")
- s := server.NewServer()
- if fileAddr != "" {
- p := server.NewStreamService(fileAddr, streamHandler, nil, 1000)
- s.EnableStreamService(share.StreamServiceName, p)
- }
- advertiseAddr := srvAddr
- if g.Config().GetBool("setting.need-advertise-addr") {
- advertiseAddr = g.Config().GetString("setting.advertise-addr")
- }
- g.Log().Infof("服务启动, basePath: %v, MicorSrv: %v", basePath, srvAddr)
- reg := g.Config().GetString("service_registry.registry")
- //if reg == "etcd" {
- // addEtcdRegistryPlugin(s, basePath, advertiseAddr, etcdAddr)
- //}
- if reg == "consul" {
- addConsulRegistryPlugin(s, basePath, advertiseAddr, etcdAddr)
- }
- return s
- }
- func addConsulRegistryPlugin(s *server.Server, basePath, srvAddr, consulAddr string) {
- r := &serverplugin.ConsulRegisterPlugin{
- ServiceAddress: "tcp@" + srvAddr,
- ConsulServers: []string{consulAddr},
- BasePath: basePath,
- Metrics: metrics.NewRegistry(),
- UpdateInterval: time.Minute,
- }
- err := r.Start()
- if err != nil {
- g.Log().Fatal(err)
- }
- g.Log().Infof("注册到Consul: %v, basePath: %v, MicorSrv: %v", consulAddr, basePath, srvAddr)
- s.Plugins.Add(r)
- }
- func streamHandler(conn net.Conn, args *share.StreamServiceArgs) {
- defer conn.Close()
- fileName := args.Meta["file_name"]
- suffix := path.Ext(fileName)
- tmpFile, err := ioutil.TempFile(os.TempDir(), "multipart-*"+suffix)
- if err != nil {
- g.Log().Error(err)
- return
- }
- defer os.Remove(tmpFile.Name())
- fileSize := args.Meta["file_size"]
- size, _ := strconv.Atoi(fileSize)
- total := 0
- buf := make([]byte, 4096)
- for {
- n, _ := conn.Read(buf)
- total += n
- _, err = tmpFile.Write(buf)
- if err != nil {
- g.Log().Error(err)
- return
- }
- //如果实际总接受字节数与客户端给的要传输字节数相等,说明传输完毕
- if total == size {
- beanName, ok := args.Meta["bean_name"]
- if ok {
- result, err := handDynamicService(beanName, tmpFile, args.Meta)
- if err != nil {
- resp := make(map[string]interface{})
- resp["code"] = 500
- resp["data"] = err.Error()
- result, _ = json.Marshal(resp)
- }
- conn.Write(result)
- } else {
- es := errors.New(fmt.Sprintf("Service[%s] 没有实现HandleFileTransfer接口", args.Meta["bean_name"]))
- g.Log().Error(es)
- resp := make(map[string]interface{})
- resp["code"] = 500
- resp["data"] = es.Error()
- result, _ := json.Marshal(resp)
- conn.Write(result)
- }
- break
- }
- }
- conn.Close()
- }
- func handDynamicService(beanName string, file *os.File, param map[string]string) ([]byte, error) {
- bean := dynamic.BeanFactory.GetBean(beanName)
- if !bean.IsValid() {
- ce := errors.New(fmt.Sprintf("Service[%s] 没有注册", beanName))
- g.Log().Error(ce)
- return nil, ce
- }
- typ := bean.Type()
- if bean.CanInterface() && typ.Implements(fileTransfer) {
- service := bean.Interface().(mutipart.FileTransfer)
- resp, err := service.HandleFileTransfer(file, param)
- if err != nil {
- ce := errors.New(fmt.Sprintf("Service[%s] 文件处理异常", beanName))
- g.Log().Error(ce, err)
- return nil, ce
- } else {
- return json.Marshal(resp)
- }
- } else {
- ce := errors.New(fmt.Sprintf("Service[%s] 没有实现HandleFileTransfer接口", beanName))
- g.Log().Error(ce)
- return nil, ce
- }
- }
- // HandleAuth 处理Auth认证
- func HandleAuth(ctx context.Context, req *protocol.Message, token string, authExcludePaths []string) error {
- path := "/" + req.ServicePath + "/" + req.ServiceMethod
- //g.Log().Info("reqPath: ", path)
- //g.Log().Info("token: ", token)
- if authPath(path, authExcludePaths) {
- rsp := validToken(token)
- //return errors.New("InvalidToken")
- if rsp.Code != 0 {
- return errors.New("InvalidToken")
- }
- //userInfo, err := getUserInfoFromToken(rsp)
- //if err!=nil{
- // return err
- //}
- //g.Dump(userInfo)
- if req.Metadata != nil {
- req.Metadata["userInfo"] = rsp.Data
- }
- return nil
- }
- return nil
- }
- // 判断路径是否需要进行认证拦截
- // return true 需要认证
- func authPath(urlPath string, authExcludePaths []string) bool {
- // 去除后斜杠
- if strings.HasSuffix(urlPath, "/") {
- urlPath = gstr.SubStr(urlPath, 0, len(urlPath)-1)
- }
- // 排除路径处理,到这里nextFlag为true
- for _, excludePath := range authExcludePaths {
- tmpPath := excludePath
- // 前缀匹配
- if strings.HasSuffix(tmpPath, "/*") {
- tmpPath = gstr.SubStr(tmpPath, 0, len(tmpPath)-2)
- if gstr.HasPrefix(urlPath, tmpPath) {
- // 前缀匹配不拦截
- return false
- }
- } else {
- // 全路径匹配
- if strings.HasSuffix(tmpPath, "/") {
- tmpPath = gstr.SubStr(tmpPath, 0, len(tmpPath)-1)
- }
- if urlPath == tmpPath {
- // 全路径匹配不拦截
- return false
- }
- }
- }
- return true
- }
- // 验证token
- func validToken(token string) auth.Response {
- rsp := &auth.Response{}
- if token == "" {
- rsp.Code = 401
- rsp.Msg = "valid token empty"
- return *rsp
- }
- authService := InitMicroSrvClient("Auth", "micro_srv.auth")
- defer authService.Close()
- err := authService.Call(context.TODO(), "ValidToken", token, rsp)
- if err != nil {
- g.Log().Error(err)
- rsp.Code = 401
- return *rsp
- }
- return *rsp
- }
- // GetUserInfo 从context中获取UserInfo
- func GetUserInfo(ctx context.Context) (request.UserInfo, error) {
- reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
- userStr, ok := reqMeta["userInfo"]
- if !ok {
- return request.UserInfo{}, errors.New("不存在UserInfo数据")
- }
- userInfo, err := getUserInfoDataString(userStr)
- if err != nil {
- return request.UserInfo{}, err
- }
- return userInfo, nil
- }
- // GetTenant 从context中获取租户码
- func GetTenant(ctx context.Context) (string, error) {
- reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
- tenant, ok := reqMeta["tenant"]
- if !ok {
- return "", errors.New("不存在租户码")
- }
- return tenant, nil
- }
- // GetToken 从context中获取Token
- func GetToken(ctx context.Context) (string, error) {
- reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
- token, ok := reqMeta["__AUTH"]
- if !ok {
- return "", errors.New("token获取失败")
- }
- return token, nil
- }
- // GetBrowserInfo 从context中获取ClientIP和UserAgent
- func GetBrowserInfo(ctx context.Context) (clientIP string, userAgent string, err error) {
- reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
- clientIP, ok := reqMeta["clientIP"]
- if !ok {
- return "", "", errors.New("BrowserInfo获取失败")
- }
- userAgent, ok = reqMeta["userAgent"]
- if !ok {
- return "", "", errors.New("BrowserInfo获取失败")
- }
- userAgent, err = gbase64.DecodeToString(userAgent)
- return
- }
- // getUserInfoDataString 从userInfo字符串转换成对象
- func getUserInfoDataString(userInfoString string) (request.UserInfo, error) {
- var userInfo request.UserInfo
- //uuid := ""
- if j, err := gjson.DecodeToJson([]byte(userInfoString)); err != nil {
- g.Log().Error(err)
- return userInfo, err
- } else {
- j.SetViolenceCheck(true)
- err = j.GetStruct("data", &userInfo)
- if err != nil {
- g.Log().Error(err)
- return userInfo, err
- }
- }
- return userInfo, nil
- }
- // SetTenant 设置租户码(传统WebAPI调用使用)
- func SetTenant(tenant string) context.Context {
- metadata := map[string]string{"tenant": tenant}
- return context.WithValue(context.Background(), share.ReqMetaDataKey, metadata)
- }
- // SetTenantAndAuth 设置租户码和认证信息(传统WebAPI调用使用)
- func SetTenantAndAuth(r *ghttp.Request, client client.XClient) context.Context {
- // 处理Auth
- token := getRequestToken(r)
- if token != "" {
- client.Auth(token)
- }
- // 处理租户码
- tenant := request.GetTenant(r)
- metadata := map[string]string{"tenant": tenant}
- return context.WithValue(context.Background(), share.ReqMetaDataKey, metadata)
- }
- // 解析token,若无,返回空
- func getRequestToken(r *ghttp.Request) string {
- authHeader := r.Header.Get("Authorization")
- if authHeader != "" {
- parts := strings.SplitN(authHeader, " ", 2)
- if !(len(parts) == 2 && parts[0] == "Bearer") {
- return ""
- } else if parts[1] == "" {
- return ""
- }
- return parts[1]
- }
- return ""
- }
|