micro_srv.go 11 KB


  1. package micro_srv
  2. import (
  3. "context"
  4. "dashoo.cn/opms_libary/dynamic"
  5. "dashoo.cn/opms_libary/gtoken"
  6. "dashoo.cn/opms_libary/mutipart"
  7. "dashoo.cn/opms_libary/myerrors"
  8. "dashoo.cn/opms_libary/request"
  9. "encoding/json"
  10. "errors"
  11. "github.com/gogf/gf/errors/gerror"
  12. "github.com/gogf/gf/util/gconv"
  13. "github.com/rcrowley/go-metrics"
  14. "io/ioutil"
  15. "net"
  16. "os"
  17. "path"
  18. "strconv"
  19. "strings"
  20. "time"
  21. "github.com/gogf/gf/encoding/gbase64"
  22. "github.com/gogf/gf/encoding/gjson"
  23. "github.com/gogf/gf/frame/g"
  24. "github.com/gogf/gf/net/ghttp"
  25. "github.com/gogf/gf/text/gstr"
  26. "github.com/smallnest/rpcx/client"
  27. "github.com/smallnest/rpcx/protocol"
  28. "github.com/smallnest/rpcx/server"
  29. "github.com/smallnest/rpcx/serverplugin"
  30. "github.com/smallnest/rpcx/share"
  31. "dashoo.cn/common_definition/auth"
  32. )
  33. // InitMicroSrvClient 获取微服务客户端,arg为可选参数,若有必须是两个,分别是:reg string, serverAddr string
  34. func InitMicroSrvClient(serviceName, key string, args ...string) (c client.XClient) {
  35. reg := g.Config().GetString("service_registry.registry")
  36. etcdAddr := g.Config().GetString("service_registry.server-addr")
  37. if len(args) == 2 {
  38. reg = args[0]
  39. etcdAddr = args[1]
  40. }
  41. config := g.Config().GetString(key)
  42. arr := strings.Split(config, ",")
  43. srvName := arr[0]
  44. if len(arr) == 2 { // 点对点 直连
  45. d, _ := client.NewPeer2PeerDiscovery("tcp@"+arr[1], "")
  46. c = client.NewXClient(serviceName, client.Failtry, client.RandomSelect, d, client.DefaultOption)
  47. return c
  48. } else {
  49. if reg == "consul" { // 服务发现使用consul
  50. //d, _ := etcd_client.NewEtcdV3Discovery(srvName, serviceName, []string{etcdAddr}, nil)
  51. d, _ := client.NewConsulDiscovery(srvName, serviceName, []string{etcdAddr}, nil)
  52. c = client.NewXClient(serviceName, client.Failover, client.RoundRobin, d, client.DefaultOption)
  53. return c
  54. }
  55. }
  56. return nil
  57. }
  58. func CreateAndInitService(basePath string) *server.Server {
  59. srvAddr := g.Config().GetString("setting.bind-addr")
  60. fileAddr := g.Config().GetString("setting.bind-mutipart-addr")
  61. etcdAddr := g.Config().GetString("service_registry.server-addr")
  62. s := server.NewServer()
  63. if fileAddr != "" {
  64. p := server.NewStreamService(fileAddr, streamHandler, nil, 1000)
  65. s.EnableStreamService(share.StreamServiceName, p)
  66. }
  67. advertiseAddr := srvAddr
  68. if g.Config().GetBool("setting.need-advertise-addr") {
  69. advertiseAddr = g.Config().GetString("setting.advertise-addr")
  70. }
  71. g.Log().Infof("服务启动, basePath: %v, MicorSrv: %v", basePath, srvAddr)
  72. reg := g.Config().GetString("service_registry.registry")
  73. //if reg == "etcd" {
  74. // addEtcdRegistryPlugin(s, basePath, advertiseAddr, etcdAddr)
  75. //}
  76. if reg == "consul" {
  77. addConsulRegistryPlugin(s, basePath, advertiseAddr, etcdAddr)
  78. }
  79. return s
  80. }
  81. func addConsulRegistryPlugin(s *server.Server, basePath, srvAddr, consulAddr string) {
  82. r := &serverplugin.ConsulRegisterPlugin{
  83. ServiceAddress: "tcp@" + srvAddr,
  84. ConsulServers: []string{consulAddr},
  85. BasePath: basePath,
  86. Metrics: metrics.NewRegistry(),
  87. UpdateInterval: time.Minute,
  88. }
  89. err := r.Start()
  90. if err != nil {
  91. g.Log().Fatal(err)
  92. }
  93. g.Log().Infof("注册到Consul: %v, basePath: %v, MicorSrv: %v", consulAddr, basePath, srvAddr)
  94. s.Plugins.Add(r)
  95. }
  96. func streamHandler(conn net.Conn, args *share.StreamServiceArgs) {
  97. defer conn.Close()
  98. ctx := context.Background()
  99. ctx = context.WithValue(ctx, share.ReqMetaDataKey, args.Meta)
  100. token, err := GetToken(ctx)
  101. if err != nil {
  102. result, _ := handError(err)
  103. conn.Write(result)
  104. return
  105. }
  106. resp := validToken(token)
  107. if resp.Code != 0 {
  108. result, _ := handError(myerrors.AuthError())
  109. conn.Write(result)
  110. return
  111. }
  112. args.Meta["userInfo"] = resp.DataString()
  113. ctx = context.WithValue(ctx, share.ReqMetaDataKey, args.Meta)
  114. fileName := args.Meta["fileName"]
  115. //获取文件后缀
  116. suffix := path.Ext(fileName)
  117. tmpFile, err := ioutil.TempFile(os.TempDir(), "multipart-*"+suffix)
  118. if err != nil {
  119. g.Log().Error(err)
  120. return
  121. }
  122. defer os.Remove(tmpFile.Name())
  123. fileSize := args.Meta["fileSize"]
  124. size, _ := strconv.Atoi(fileSize)
  125. total := 0
  126. buf := make([]byte, 4096)
  127. for {
  128. n, _ := conn.Read(buf)
  129. total += n
  130. _, err = tmpFile.Write(buf)
  131. if err != nil {
  132. g.Log().Error(err)
  133. return
  134. }
  135. //如果实际总接受字节数与客户端给的要传输字节数相等,说明传输完毕
  136. if total == size {
  137. result := make([]byte, 0)
  138. className, _ := args.Meta["reqService"]
  139. methodName, _ := args.Meta["reqMethod"]
  140. message := new(dynamic.Message)
  141. message.ClassName = className
  142. message.MethodName = methodName
  143. message.Payload = &mutipart.MultipartFile{FileName: fileName, FileSize: gconv.Int64(fileSize), File: tmpFile}
  144. rsp, err := dynamic.Invoker.HandleInvoker(ctx, message)
  145. if err != nil {
  146. resp := make(map[string]interface{})
  147. resp["code"] = 500
  148. resp["data"] = err.Error()
  149. result, _ = json.Marshal(resp)
  150. } else {
  151. result, _ = json.Marshal(rsp.Payload)
  152. }
  153. conn.Write(result)
  154. break
  155. }
  156. }
  157. conn.Close()
  158. }
  159. // HandleAuth 处理Auth认证
  160. func HandleAuth(ctx context.Context, req *protocol.Message, token string, authExcludePaths []string) error {
  161. path := "/" + req.ServicePath + "/" + req.ServiceMethod
  162. //g.Log().Info("reqPath: ", path)
  163. //g.Log().Info("token: ", token)
  164. if authPath(path, authExcludePaths) {
  165. req.Metadata["authExclude"] = "false"
  166. var rsp gtoken.Resp
  167. notAuthSrv := ctx.Value("NotAuthSrv")
  168. if notAuthSrv != nil && notAuthSrv.(bool) {
  169. rsp = gtoken.GFToken.ValidToken(token)
  170. } else {
  171. rsp = validToken(token)
  172. }
  173. //return errors.New("InvalidToken")
  174. if rsp.Code != 0 {
  175. return myerrors.AuthError()
  176. }
  177. //userInfo, err := getUserInfoFromToken(rsp)
  178. //if err!=nil{
  179. // return err
  180. //}
  181. //g.Dump(userInfo)
  182. if req.Metadata != nil {
  183. req.Metadata["userInfo"] = rsp.DataString()
  184. }
  185. return nil
  186. }
  187. return nil
  188. }
  189. // 判断路径是否需要进行认证拦截
  190. // return true 需要认证
  191. func authPath(urlPath string, authExcludePaths []string) bool {
  192. // 去除后斜杠
  193. if strings.HasSuffix(urlPath, "/") {
  194. urlPath = gstr.SubStr(urlPath, 0, len(urlPath)-1)
  195. }
  196. // 排除路径处理,到这里nextFlag为true
  197. for _, excludePath := range authExcludePaths {
  198. tmpPath := excludePath
  199. // 前缀匹配
  200. if strings.HasSuffix(tmpPath, "/*") {
  201. tmpPath = gstr.SubStr(tmpPath, 0, len(tmpPath)-2)
  202. if gstr.HasPrefix(urlPath, tmpPath) {
  203. // 前缀匹配不拦截
  204. return false
  205. }
  206. } else {
  207. // 全路径匹配
  208. if strings.HasSuffix(tmpPath, "/") {
  209. tmpPath = gstr.SubStr(tmpPath, 0, len(tmpPath)-1)
  210. }
  211. if urlPath == tmpPath {
  212. // 全路径匹配不拦截
  213. return false
  214. }
  215. }
  216. }
  217. return true
  218. }
  219. // 验证token
  220. func validToken(token string) gtoken.Resp {
  221. grsp := gtoken.Resp{}
  222. if token == "" {
  223. grsp.Code = 401
  224. grsp.Msg = "valid token empty"
  225. return grsp
  226. }
  227. authService := InitMicroSrvClient("Auth", "micro_srv.auth")
  228. defer authService.Close()
  229. rsp := &auth.Response{}
  230. err := authService.Call(context.TODO(), "ValidToken", token, rsp)
  231. if err != nil {
  232. g.Log().Error(err)
  233. grsp.Code = 401
  234. return grsp
  235. }
  236. grsp.Code = int(rsp.Code)
  237. grsp.Msg = rsp.Msg
  238. grsp.Data = rsp.Data
  239. return grsp
  240. }
  241. // IsAuthExclude 是否进行auth验证
  242. func IsAuthExclude(ctx context.Context) bool {
  243. reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
  244. flag, ok := reqMeta["authExclude"]
  245. if !ok || flag == "true" {
  246. return true
  247. }
  248. return false
  249. }
  250. // GetUserInfo 从context中获取UserInfo
  251. func GetUserInfo(ctx context.Context) (request.UserInfo, error) {
  252. reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
  253. userStr, ok := reqMeta["userInfo"]
  254. if !ok {
  255. return request.UserInfo{}, errors.New("用户信息获取失败,请重新登录。")
  256. }
  257. userInfo, err := getUserInfoDataString(userStr)
  258. if err != nil {
  259. return request.UserInfo{}, errors.New("用户信息解码失败。")
  260. }
  261. return userInfo, nil
  262. }
  263. // GetTenant 从context中获取租户码
  264. func GetTenant(ctx context.Context) (string, error) {
  265. reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
  266. tenant, ok := reqMeta["tenant"]
  267. if !ok {
  268. return "", errors.New("不存在租户码")
  269. }
  270. return tenant, nil
  271. }
  272. // GetReqMethod 从context中获取请求方式
  273. func GetReqMethod(ctx context.Context) (string, error) {
  274. reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
  275. reqMethod, ok := reqMeta["reqMethod"]
  276. if !ok {
  277. return "", errors.New("获取请求方式异常")
  278. }
  279. return reqMethod, nil
  280. }
  281. // GetToken 从context中获取Token
  282. func GetToken(ctx context.Context) (string, error) {
  283. reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
  284. token, ok := reqMeta["__AUTH"]
  285. if !ok {
  286. return "", errors.New("token获取失败")
  287. }
  288. return token, nil
  289. }
  290. // GetBrowserInfo 从context中获取ClientIP和UserAgent
  291. func GetBrowserInfo(ctx context.Context) (clientIP string, userAgent string, err error) {
  292. reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
  293. clientIP, ok := reqMeta["clientIP"]
  294. if !ok {
  295. return "", "", errors.New("BrowserInfo获取失败")
  296. }
  297. userAgent, ok = reqMeta["userAgent"]
  298. if !ok {
  299. return "", "", errors.New("BrowserInfo获取失败")
  300. }
  301. userAgent, err = gbase64.DecodeToString(userAgent)
  302. return
  303. }
  304. // getUserInfoDataString 从userInfo字符串转换成对象
  305. func getUserInfoDataString(userInfoString string) (request.UserInfo, error) {
  306. var userInfo request.UserInfo
  307. //uuid := ""
  308. if j, err := gjson.DecodeToJson([]byte(userInfoString)); err != nil {
  309. g.Log().Error(err)
  310. return userInfo, err
  311. } else {
  312. j.SetViolenceCheck(true)
  313. err = j.GetStruct("data", &userInfo)
  314. if err != nil {
  315. g.Log().Error(err)
  316. return userInfo, err
  317. }
  318. }
  319. return userInfo, nil
  320. }
  321. // SetTenant 设置租户码(传统WebAPI调用使用)
  322. func SetTenant(tenant string) context.Context {
  323. metadata := map[string]string{"tenant": tenant}
  324. return context.WithValue(context.Background(), share.ReqMetaDataKey, metadata)
  325. }
  326. // SetTenantAndAuth 设置租户码和认证信息(传统WebAPI调用使用)
  327. func SetTenantAndAuth(r *ghttp.Request, client client.XClient) context.Context {
  328. // 处理Auth
  329. token := getRequestToken(r)
  330. if token != "" {
  331. client.Auth(token)
  332. }
  333. // 处理租户码
  334. tenant := request.GetTenant(r)
  335. metadata := map[string]string{"tenant": tenant}
  336. return context.WithValue(context.Background(), share.ReqMetaDataKey, metadata)
  337. }
  338. // 解析token,若无,返回空
  339. func getRequestToken(r *ghttp.Request) string {
  340. authHeader := r.Header.Get("Authorization")
  341. if authHeader != "" {
  342. parts := strings.SplitN(authHeader, " ", 2)
  343. if !(len(parts) == 2 && parts[0] == "Bearer") {
  344. return ""
  345. } else if parts[1] == "" {
  346. return ""
  347. }
  348. return parts[1]
  349. }
  350. return ""
  351. }
  352. func handError(err error) ([]byte, error) {
  353. resp := make(map[string]interface{})
  354. resp["code"] = gerror.Code(err).Code()
  355. resp["data"] = gerror.Code(err).Message()
  356. return json.Marshal(resp)
  357. }