micro_srv.go 9.9 KB


  1. package micro_srv
  2. import (
  3. "context"
  4. "dashoo.cn/opms_libary/dynamic"
  5. "dashoo.cn/opms_libary/mutipart"
  6. "dashoo.cn/opms_libary/request"
  7. "encoding/json"
  8. "errors"
  9. "fmt"
  10. "github.com/rcrowley/go-metrics"
  11. "io/ioutil"
  12. "net"
  13. "os"
  14. "path"
  15. "reflect"
  16. "strconv"
  17. "strings"
  18. "time"
  19. "github.com/gogf/gf/encoding/gbase64"
  20. "github.com/gogf/gf/encoding/gjson"
  21. "github.com/gogf/gf/frame/g"
  22. "github.com/gogf/gf/net/ghttp"
  23. "github.com/gogf/gf/text/gstr"
  24. "github.com/smallnest/rpcx/client"
  25. "github.com/smallnest/rpcx/protocol"
  26. "github.com/smallnest/rpcx/server"
  27. "github.com/smallnest/rpcx/serverplugin"
  28. "github.com/smallnest/rpcx/share"
  29. "dashoo.cn/common_definition/auth"
  30. )
  31. var (
  32. fileTransfer = reflect.TypeOf((*mutipart.FileTransfer)(nil)).Elem()
  33. )
  34. // InitMicroSrvClient 获取微服务客户端,arg为可选参数,若有必须是两个,分别是:reg string, serverAddr string
  35. func InitMicroSrvClient(serviceName, key string, args ...string) (c client.XClient) {
  36. reg := g.Config().GetString("service_registry.registry")
  37. etcdAddr := g.Config().GetString("service_registry.server-addr")
  38. if len(args) == 2 {
  39. reg = args[0]
  40. etcdAddr = args[1]
  41. }
  42. config := g.Config().GetString(key)
  43. arr := strings.Split(config, ",")
  44. srvName := arr[0]
  45. if len(arr) == 2 { // 点对点 直连
  46. d, _ := client.NewPeer2PeerDiscovery("tcp@"+arr[1], "")
  47. c = client.NewXClient(serviceName, client.Failtry, client.RandomSelect, d, client.DefaultOption)
  48. return c
  49. } else {
  50. if reg == "consul" { // 服务发现使用consul
  51. //d, _ := etcd_client.NewEtcdV3Discovery(srvName, serviceName, []string{etcdAddr}, nil)
  52. d, _ := client.NewConsulDiscovery(srvName, serviceName, []string{etcdAddr}, nil)
  53. c = client.NewXClient(serviceName, client.Failover, client.RoundRobin, d, client.DefaultOption)
  54. return c
  55. }
  56. }
  57. return nil
  58. }
  59. func CreateAndInitService(basePath string) *server.Server {
  60. srvAddr := g.Config().GetString("setting.bind-addr")
  61. fileAddr := g.Config().GetString("setting.bind-mutipart-addr")
  62. etcdAddr := g.Config().GetString("service_registry.server-addr")
  63. s := server.NewServer()
  64. if fileAddr != "" {
  65. p := server.NewStreamService(fileAddr, streamHandler, nil, 1000)
  66. s.EnableStreamService(share.StreamServiceName, p)
  67. }
  68. advertiseAddr := srvAddr
  69. if g.Config().GetBool("setting.need-advertise-addr") {
  70. advertiseAddr = g.Config().GetString("setting.advertise-addr")
  71. }
  72. g.Log().Infof("服务启动, basePath: %v, MicorSrv: %v", basePath, srvAddr)
  73. reg := g.Config().GetString("service_registry.registry")
  74. //if reg == "etcd" {
  75. // addEtcdRegistryPlugin(s, basePath, advertiseAddr, etcdAddr)
  76. //}
  77. if reg == "consul" {
  78. addConsulRegistryPlugin(s, basePath, advertiseAddr, etcdAddr)
  79. }
  80. return s
  81. }
  82. func addConsulRegistryPlugin(s *server.Server, basePath, srvAddr, consulAddr string) {
  83. r := &serverplugin.ConsulRegisterPlugin{
  84. ServiceAddress: "tcp@" + srvAddr,
  85. ConsulServers: []string{consulAddr},
  86. BasePath: basePath,
  87. Metrics: metrics.NewRegistry(),
  88. UpdateInterval: time.Minute,
  89. }
  90. err := r.Start()
  91. if err != nil {
  92. g.Log().Fatal(err)
  93. }
  94. g.Log().Infof("注册到Consul: %v, basePath: %v, MicorSrv: %v", consulAddr, basePath, srvAddr)
  95. s.Plugins.Add(r)
  96. }
  97. func streamHandler(conn net.Conn, args *share.StreamServiceArgs) {
  98. defer conn.Close()
  99. fileName := args.Meta["file_name"]
  100. suffix := path.Ext(fileName)
  101. tmpFile, err := ioutil.TempFile(os.TempDir(), "multipart-*"+suffix)
  102. if err != nil {
  103. g.Log().Error(err)
  104. return
  105. }
  106. defer os.Remove(tmpFile.Name())
  107. fileSize := args.Meta["file_size"]
  108. size, _ := strconv.Atoi(fileSize)
  109. total := 0
  110. buf := make([]byte, 4096)
  111. for {
  112. n, _ := conn.Read(buf)
  113. total += n
  114. _, err = tmpFile.Write(buf)
  115. if err != nil {
  116. g.Log().Error(err)
  117. return
  118. }
  119. //如果实际总接受字节数与客户端给的要传输字节数相等,说明传输完毕
  120. if total == size {
  121. beanName, ok := args.Meta["bean_name"]
  122. if ok {
  123. result, err := handDynamicService(beanName, tmpFile, args.Meta)
  124. if err != nil {
  125. resp := make(map[string]interface{})
  126. resp["code"] = 500
  127. resp["data"] = err.Error()
  128. result, _ = json.Marshal(resp)
  129. }
  130. conn.Write(result)
  131. } else {
  132. es := errors.New(fmt.Sprintf("Service[%s] 没有实现HandleFileTransfer接口", args.Meta["bean_name"]))
  133. g.Log().Error(es)
  134. resp := make(map[string]interface{})
  135. resp["code"] = 500
  136. resp["data"] = es.Error()
  137. result, _ := json.Marshal(resp)
  138. conn.Write(result)
  139. }
  140. break
  141. }
  142. }
  143. conn.Close()
  144. }
  145. func handDynamicService(beanName string, file *os.File, param map[string]string) ([]byte, error) {
  146. bean := dynamic.BeanFactory.GetBean(beanName)
  147. if !bean.IsValid() {
  148. ce := errors.New(fmt.Sprintf("Service[%s] 没有注册", beanName))
  149. g.Log().Error(ce)
  150. return nil, ce
  151. }
  152. typ := bean.Type()
  153. if bean.CanInterface() && typ.Implements(fileTransfer) {
  154. service := bean.Interface().(mutipart.FileTransfer)
  155. resp, err := service.HandleFileTransfer(file, param)
  156. if err != nil {
  157. ce := errors.New(fmt.Sprintf("Service[%s] 文件处理异常", beanName))
  158. g.Log().Error(ce, err)
  159. return nil, ce
  160. } else {
  161. return json.Marshal(resp)
  162. }
  163. } else {
  164. ce := errors.New(fmt.Sprintf("Service[%s] 没有实现HandleFileTransfer接口", beanName))
  165. g.Log().Error(ce)
  166. return nil, ce
  167. }
  168. }
  169. // HandleAuth 处理Auth认证
  170. func HandleAuth(ctx context.Context, req *protocol.Message, token string, authExcludePaths []string) error {
  171. path := "/" + req.ServicePath + "/" + req.ServiceMethod
  172. //g.Log().Info("reqPath: ", path)
  173. //g.Log().Info("token: ", token)
  174. if authPath(path, authExcludePaths) {
  175. rsp := validToken(token)
  176. //return errors.New("InvalidToken")
  177. if rsp.Code != 0 {
  178. return errors.New("InvalidToken")
  179. }
  180. //userInfo, err := getUserInfoFromToken(rsp)
  181. //if err!=nil{
  182. // return err
  183. //}
  184. //g.Dump(userInfo)
  185. if req.Metadata != nil {
  186. req.Metadata["userInfo"] = rsp.Data
  187. }
  188. return nil
  189. }
  190. return nil
  191. }
  192. // 判断路径是否需要进行认证拦截
  193. // return true 需要认证
  194. func authPath(urlPath string, authExcludePaths []string) bool {
  195. // 去除后斜杠
  196. if strings.HasSuffix(urlPath, "/") {
  197. urlPath = gstr.SubStr(urlPath, 0, len(urlPath)-1)
  198. }
  199. // 排除路径处理,到这里nextFlag为true
  200. for _, excludePath := range authExcludePaths {
  201. tmpPath := excludePath
  202. // 前缀匹配
  203. if strings.HasSuffix(tmpPath, "/*") {
  204. tmpPath = gstr.SubStr(tmpPath, 0, len(tmpPath)-2)
  205. if gstr.HasPrefix(urlPath, tmpPath) {
  206. // 前缀匹配不拦截
  207. return false
  208. }
  209. } else {
  210. // 全路径匹配
  211. if strings.HasSuffix(tmpPath, "/") {
  212. tmpPath = gstr.SubStr(tmpPath, 0, len(tmpPath)-1)
  213. }
  214. if urlPath == tmpPath {
  215. // 全路径匹配不拦截
  216. return false
  217. }
  218. }
  219. }
  220. return true
  221. }
  222. // 验证token
  223. func validToken(token string) auth.Response {
  224. rsp := &auth.Response{}
  225. if token == "" {
  226. rsp.Code = 401
  227. rsp.Msg = "valid token empty"
  228. return *rsp
  229. }
  230. authService := InitMicroSrvClient("Auth", "micro_srv.auth")
  231. defer authService.Close()
  232. err := authService.Call(context.TODO(), "ValidToken", token, rsp)
  233. if err != nil {
  234. g.Log().Error(err)
  235. rsp.Code = 401
  236. return *rsp
  237. }
  238. return *rsp
  239. }
  240. // GetUserInfo 从context中获取UserInfo
  241. func GetUserInfo(ctx context.Context) (request.UserInfo, error) {
  242. reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
  243. userStr, ok := reqMeta["userInfo"]
  244. if !ok {
  245. return request.UserInfo{}, errors.New("不存在UserInfo数据")
  246. }
  247. userInfo, err := getUserInfoDataString(userStr)
  248. if err != nil {
  249. return request.UserInfo{}, err
  250. }
  251. return userInfo, nil
  252. }
  253. // GetTenant 从context中获取租户码
  254. func GetTenant(ctx context.Context) (string, error) {
  255. reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
  256. tenant, ok := reqMeta["tenant"]
  257. if !ok {
  258. return "", errors.New("不存在租户码")
  259. }
  260. return tenant, nil
  261. }
  262. // GetToken 从context中获取Token
  263. func GetToken(ctx context.Context) (string, error) {
  264. reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
  265. token, ok := reqMeta["__AUTH"]
  266. if !ok {
  267. return "", errors.New("token获取失败")
  268. }
  269. return token, nil
  270. }
  271. // GetBrowserInfo 从context中获取ClientIP和UserAgent
  272. func GetBrowserInfo(ctx context.Context) (clientIP string, userAgent string, err error) {
  273. reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
  274. clientIP, ok := reqMeta["clientIP"]
  275. if !ok {
  276. return "", "", errors.New("BrowserInfo获取失败")
  277. }
  278. userAgent, ok = reqMeta["userAgent"]
  279. if !ok {
  280. return "", "", errors.New("BrowserInfo获取失败")
  281. }
  282. userAgent, err = gbase64.DecodeToString(userAgent)
  283. return
  284. }
  285. // getUserInfoDataString 从userInfo字符串转换成对象
  286. func getUserInfoDataString(userInfoString string) (request.UserInfo, error) {
  287. var userInfo request.UserInfo
  288. //uuid := ""
  289. if j, err := gjson.DecodeToJson([]byte(userInfoString)); err != nil {
  290. g.Log().Error(err)
  291. return userInfo, err
  292. } else {
  293. j.SetViolenceCheck(true)
  294. err = j.GetStruct("data", &userInfo)
  295. if err != nil {
  296. g.Log().Error(err)
  297. return userInfo, err
  298. }
  299. }
  300. return userInfo, nil
  301. }
  302. // SetTenant 设置租户码(传统WebAPI调用使用)
  303. func SetTenant(tenant string) context.Context {
  304. metadata := map[string]string{"tenant": tenant}
  305. return context.WithValue(context.Background(), share.ReqMetaDataKey, metadata)
  306. }
  307. // SetTenantAndAuth 设置租户码和认证信息(传统WebAPI调用使用)
  308. func SetTenantAndAuth(r *ghttp.Request, client client.XClient) context.Context {
  309. // 处理Auth
  310. token := getRequestToken(r)
  311. if token != "" {
  312. client.Auth(token)
  313. }
  314. // 处理租户码
  315. tenant := request.GetTenant(r)
  316. metadata := map[string]string{"tenant": tenant}
  317. return context.WithValue(context.Background(), share.ReqMetaDataKey, metadata)
  318. }
  319. // 解析token,若无,返回空
  320. func getRequestToken(r *ghttp.Request) string {
  321. authHeader := r.Header.Get("Authorization")
  322. if authHeader != "" {
  323. parts := strings.SplitN(authHeader, " ", 2)
  324. if !(len(parts) == 2 && parts[0] == "Bearer") {
  325. return ""
  326. } else if parts[1] == "" {
  327. return ""
  328. }
  329. return parts[1]
  330. }
  331. return ""
  332. }