micro_srv.go 12 KB


  1. package micro_srv
  2. import (
  3. "context"
  4. "dashoo.cn/opms_libary/dynamic"
  5. "dashoo.cn/opms_libary/multipart"
  6. "encoding/json"
  7. "errors"
  8. "fmt"
  9. "github.com/gogf/gf/util/gconv"
  10. "io/ioutil"
  11. "net"
  12. "os"
  13. "path"
  14. "strconv"
  15. "strings"
  16. "time"
  17. "dashoo.cn/common_definition/comm_def"
  18. "dashoo.cn/opms_libary/gtoken"
  19. "dashoo.cn/opms_libary/myerrors"
  20. "dashoo.cn/opms_libary/request"
  21. "github.com/gogf/gf/encoding/gbase64"
  22. "github.com/gogf/gf/encoding/gjson"
  23. "github.com/gogf/gf/errors/gerror"
  24. "github.com/gogf/gf/frame/g"
  25. "github.com/gogf/gf/net/ghttp"
  26. "github.com/gogf/gf/text/gstr"
  27. "github.com/rcrowley/go-metrics"
  28. consulclient "github.com/rpcxio/rpcx-consul/client"
  29. "github.com/rpcxio/rpcx-consul/serverplugin"
  30. "github.com/smallnest/rpcx/client"
  31. "github.com/smallnest/rpcx/protocol"
  32. "github.com/smallnest/rpcx/server"
  33. "github.com/smallnest/rpcx/share"
  34. )
  35. // InitMicroSrvClient 获取微服务客户端,arg为可选参数,若有必须是两个,分别是:reg string, serverAddr string
  36. func InitMicroSrvClient(serviceName, key string, args ...string) (c client.XClient) {
  37. reg := g.Config().GetString("service_registry.registry")
  38. etcdAddr := g.Config().GetString("service_registry.server-addr")
  39. if len(args) == 2 {
  40. reg = args[0]
  41. etcdAddr = args[1]
  42. }
  43. config := g.Config().GetString(key)
  44. arr := strings.Split(config, ",")
  45. srvName := arr[0]
  46. if len(arr) == 2 { // 点对点 直连
  47. d, _ := client.NewPeer2PeerDiscovery("tcp@"+arr[1], "")
  48. c = client.NewXClient(serviceName, client.Failtry, client.RandomSelect, d, client.DefaultOption)
  49. return c
  50. } else {
  51. if reg == "consul" { // 服务发现使用consul
  52. //d, _ := etcd_client.NewEtcdV3Discovery(srvName, serviceName, []string{etcdAddr}, nil)
  53. d, _ := consulclient.NewConsulDiscovery(srvName, serviceName, []string{etcdAddr}, nil)
  54. //d, _ := client.NewConsulDiscovery(srvName, serviceName, []string{etcdAddr}, nil)
  55. c = client.NewXClient(serviceName, client.Failover, client.RoundRobin, d, client.DefaultOption)
  56. return c
  57. }
  58. }
  59. return nil
  60. }
  61. func CreateAndInitService(basePath string) *server.Server {
  62. srvAddr := g.Config().GetString("setting.bind-addr")
  63. fileAddr := g.Config().GetString("setting.bind-mutipart-addr")
  64. etcdAddr := g.Config().GetString("service_registry.server-addr")
  65. s := server.NewServer()
  66. advertiseAddr := srvAddr
  67. if g.Config().GetBool("setting.need-advertise-addr") {
  68. advertiseAddr = g.Config().GetString("setting.advertise-addr")
  69. }
  70. g.Log().Infof("服务启动, basePath: %v, MicorSrv: %v", basePath, srvAddr)
  71. reg := g.Config().GetString("service_registry.registry")
  72. //if reg == "etcd" {
  73. // addEtcdRegistryPlugin(s, basePath, advertiseAddr, etcdAddr)
  74. //}
  75. if reg == "consul" {
  76. addConsulRegistryPlugin(s, basePath, advertiseAddr, etcdAddr)
  77. }
  78. if fileAddr != "" {
  79. p := server.NewStreamService(fileAddr, streamHandler, nil, 1000)
  80. s.EnableStreamService(share.StreamServiceName, p)
  81. }
  82. return s
  83. }
  84. func addConsulRegistryPlugin(s *server.Server, basePath, srvAddr, consulAddr string) {
  85. r := &serverplugin.ConsulRegisterPlugin{
  86. ServiceAddress: "tcp@" + srvAddr,
  87. ConsulServers: []string{consulAddr},
  88. BasePath: basePath,
  89. Metrics: metrics.NewRegistry(),
  90. UpdateInterval: time.Minute,
  91. }
  92. err := r.Start()
  93. if err != nil {
  94. g.Log().Fatal(err)
  95. }
  96. g.Log().Infof("注册到Consul: %v, basePath: %v, MicorSrv: %v", consulAddr, basePath, srvAddr)
  97. s.Plugins.Add(r)
  98. }
  99. func streamHandler(conn net.Conn, args *share.StreamServiceArgs) {
  100. defer conn.Close()
  101. ctx := context.Background()
  102. ctx = context.WithValue(ctx, share.ReqMetaDataKey, args.Meta)
  103. token, err := GetToken(ctx)
  104. if err != nil {
  105. result, _ := handError(err)
  106. conn.Write(result)
  107. return
  108. }
  109. resp := validToken(ctx, token)
  110. if resp.Code != 0 {
  111. result, _ := handError(myerrors.AuthError())
  112. conn.Write(result)
  113. return
  114. }
  115. args.Meta["userInfo"] = resp.DataString()
  116. ctx = context.WithValue(ctx, share.ReqMetaDataKey, args.Meta)
  117. var form = new(multipart.Form)
  118. form.Value = map[string]string{}
  119. form.File = map[string]*multipart.FileHeader{}
  120. for key, value := range args.Meta {
  121. form.Value[key] = value
  122. }
  123. fileNum := gconv.Int(args.Meta["fileNum"])
  124. if fileNum > 0 {
  125. for {
  126. // 读取文件名长度
  127. buf := make([]byte, 3)
  128. conn.Read(buf)
  129. length, _ := strconv.Atoi(strings.TrimSpace(string(buf)))
  130. // 读取文件名
  131. fileHeader := make([]byte, length)
  132. _, err := conn.Read(fileHeader)
  133. headers := strings.Split(string(fileHeader), " ")
  134. if len(headers) != 3 {
  135. fmt.Println(form, "111111")
  136. break
  137. }
  138. paramName := headers[0]
  139. fileName := headers[1]
  140. fileSize := headers[2]
  141. //获取文件后缀
  142. suffix := path.Ext(fileName)
  143. tmpFile, err := ioutil.TempFile(os.TempDir(), "multipart-*"+suffix)
  144. if err != nil {
  145. g.Log().Error(err)
  146. return
  147. }
  148. defer os.Remove(tmpFile.Name())
  149. size, _ := strconv.Atoi(fileSize)
  150. curSize := 0
  151. for {
  152. if curSize+1024 > size {
  153. buf = make([]byte, size-curSize)
  154. curSize = size
  155. } else {
  156. buf = make([]byte, 1024)
  157. curSize = curSize + 1024
  158. }
  159. conn.Read(buf)
  160. tmpFile.Write(buf)
  161. if curSize == size {
  162. break
  163. }
  164. }
  165. fmt.Println("1==============================")
  166. form.File[paramName] = &multipart.FileHeader{FileName: fileName, FileSize: int64(size), File: tmpFile}
  167. fmt.Println(form)
  168. // 判断是否结束
  169. isEnd := make([]byte, 1)
  170. conn.Read(isEnd)
  171. fmt.Println(gconv.String(isEnd))
  172. if gconv.String(isEnd) == "1" {
  173. fmt.Println("2==============================")
  174. continue
  175. }
  176. if gconv.String(isEnd) == "2" {
  177. fmt.Println("3==============================")
  178. break
  179. }
  180. fmt.Println("4==============================")
  181. }
  182. fmt.Println("5==============================")
  183. }
  184. fmt.Println("6==============================")
  185. result := make([]byte, 0)
  186. className, _ := args.Meta["reqService"]
  187. methodName, _ := args.Meta["reqMethod"]
  188. message := new(dynamic.Message)
  189. message.ClassName = className
  190. message.MethodName = methodName
  191. message.Metadata = args.Meta
  192. message.Payload = form
  193. rsp, err := dynamic.Invoker.HandleInvoker(ctx, message)
  194. if err != nil {
  195. resp := make(map[string]interface{})
  196. resp["code"] = 500
  197. resp["data"] = err.Error()
  198. result, _ = json.Marshal(resp)
  199. } else {
  200. result, _ = json.Marshal(rsp.Payload)
  201. }
  202. conn.Write(result)
  203. conn.Close()
  204. }
  205. func getTenant(msg *protocol.Message) string {
  206. var tenant string
  207. if msg.Metadata != nil {
  208. tenant = msg.Metadata["tenant"]
  209. }
  210. return tenant
  211. }
  212. // HandleAuth 处理Auth认证
  213. func HandleAuth(ctx context.Context, req *protocol.Message, token string, authExcludePaths []string) error {
  214. reqPath := "/" + req.ServicePath + "/" + req.ServiceMethod
  215. tenant := getTenant(req)
  216. g.Log().Info("Received " + reqPath + " tenant @ " + tenant + " request @ " + gconv.String(req))
  217. if authPath(reqPath, authExcludePaths) {
  218. req.Metadata["authExclude"] = "false"
  219. var rsp gtoken.Resp
  220. notAuthSrv := ctx.Value("NotAuthSrv")
  221. g.Log().Info("notAuthSrv:" + gconv.String(notAuthSrv))
  222. if notAuthSrv != nil && notAuthSrv.(bool) {
  223. rsp = gtoken.GFToken.ValidToken(token)
  224. } else {
  225. ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{"tenant": tenant})
  226. rsp = validToken(ctx, token)
  227. }
  228. g.Log().Info("rsp:" + gconv.String(rsp))
  229. if rsp.Code != 0 && rsp.Code != 200 {
  230. return myerrors.AuthError()
  231. }
  232. if req.Metadata != nil {
  233. req.Metadata["userInfo"] = rsp.DataString()
  234. }
  235. return nil
  236. }
  237. return nil
  238. }
  239. // 判断路径是否需要进行认证拦截
  240. // return true 需要认证
  241. func authPath(urlPath string, authExcludePaths []string) bool {
  242. // 去除后斜杠
  243. if strings.HasSuffix(urlPath, "/") {
  244. urlPath = gstr.SubStr(urlPath, 0, len(urlPath)-1)
  245. }
  246. // 排除路径处理,到这里nextFlag为true
  247. for _, excludePath := range authExcludePaths {
  248. tmpPath := excludePath
  249. // 前缀匹配
  250. if strings.HasSuffix(tmpPath, "/*") {
  251. tmpPath = gstr.SubStr(tmpPath, 0, len(tmpPath)-2)
  252. if gstr.HasPrefix(urlPath, tmpPath) {
  253. // 前缀匹配不拦截
  254. return false
  255. }
  256. } else {
  257. // 全路径匹配
  258. if strings.HasSuffix(tmpPath, "/") {
  259. tmpPath = gstr.SubStr(tmpPath, 0, len(tmpPath)-1)
  260. }
  261. if urlPath == tmpPath {
  262. // 全路径匹配不拦截
  263. return false
  264. }
  265. }
  266. }
  267. return true
  268. }
  269. // 验证token
  270. func validToken(ctx context.Context, token string) gtoken.Resp {
  271. grsp := gtoken.Resp{}
  272. if token == "" {
  273. grsp.Code = 401
  274. grsp.Msg = "valid token empty"
  275. return grsp
  276. }
  277. authService := InitMicroSrvClient("Auth", "micro_srv.auth")
  278. defer authService.Close()
  279. rsp := &comm_def.CommonMsg{}
  280. err := authService.Call(ctx, "ValidToken", token, rsp)
  281. if err != nil {
  282. g.Log().Error(err)
  283. grsp.Code = 401
  284. return grsp
  285. }
  286. grsp.Code = int(rsp.Code)
  287. grsp.Msg = rsp.Msg
  288. grsp.Data = rsp.Data
  289. return grsp
  290. }
  291. // IsAuthExclude 是否进行auth验证
  292. func IsAuthExclude(ctx context.Context) bool {
  293. reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
  294. flag, ok := reqMeta["authExclude"]
  295. if !ok || flag == "true" {
  296. return true
  297. }
  298. return false
  299. }
  300. // GetUserInfo 从context中获取UserInfo
  301. func GetUserInfo(ctx context.Context) (request.UserInfo, error) {
  302. reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
  303. userStr, ok := reqMeta["userInfo"]
  304. if !ok {
  305. return request.UserInfo{}, errors.New("用户信息获取失败,请重新登录。")
  306. }
  307. userInfo, err := getUserInfoDataString(userStr)
  308. if err != nil {
  309. return request.UserInfo{}, errors.New("用户信息解码失败。")
  310. }
  311. return userInfo, nil
  312. }
  313. // GetTenant 从context中获取租户码
  314. func GetTenant(ctx context.Context) (string, error) {
  315. reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
  316. tenant, ok := reqMeta["tenant"]
  317. if !ok {
  318. return "", errors.New("不存在租户码")
  319. }
  320. return tenant, nil
  321. }
  322. // GetReqMethod 从context中获取请求方式
  323. func GetReqMethod(ctx context.Context) (string, error) {
  324. reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
  325. reqMethod, ok := reqMeta["reqMethod"]
  326. if !ok {
  327. return "", errors.New("获取请求方式异常")
  328. }
  329. return reqMethod, nil
  330. }
  331. // GetToken 从context中获取Token
  332. func GetToken(ctx context.Context) (string, error) {
  333. reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
  334. token, ok := reqMeta["__AUTH"]
  335. if !ok {
  336. return "", errors.New("token获取失败")
  337. }
  338. return token, nil
  339. }
  340. // GetBrowserInfo 从context中获取ClientIP和UserAgent
  341. func GetBrowserInfo(ctx context.Context) (clientIP string, userAgent string, err error) {
  342. reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
  343. clientIP, ok := reqMeta["clientIP"]
  344. if !ok {
  345. return "", "", errors.New("BrowserInfo获取失败")
  346. }
  347. userAgent, ok = reqMeta["userAgent"]
  348. if !ok {
  349. return "", "", errors.New("BrowserInfo获取失败")
  350. }
  351. userAgent, err = gbase64.DecodeToString(userAgent)
  352. return
  353. }
  354. // getUserInfoDataString 从userInfo字符串转换成对象
  355. func getUserInfoDataString(userInfoString string) (request.UserInfo, error) {
  356. var userInfo request.UserInfo
  357. //uuid := ""
  358. if j, err := gjson.DecodeToJson([]byte(userInfoString)); err != nil {
  359. g.Log().Error(err)
  360. return userInfo, err
  361. } else {
  362. j.SetViolenceCheck(true)
  363. err = j.GetStruct("data", &userInfo)
  364. if err != nil {
  365. g.Log().Error(err)
  366. return userInfo, err
  367. }
  368. }
  369. return userInfo, nil
  370. }
  371. // SetTenant 设置租户码(传统WebAPI调用使用)
  372. func SetTenant(tenant string) context.Context {
  373. metadata := map[string]string{"tenant": tenant}
  374. return context.WithValue(context.Background(), share.ReqMetaDataKey, metadata)
  375. }
  376. // SetTenantAndAuth 设置租户码和认证信息(传统WebAPI调用使用)
  377. func SetTenantAndAuth(r *ghttp.Request, client client.XClient) context.Context {
  378. // 处理Auth
  379. token := getRequestToken(r)
  380. if token != "" {
  381. client.Auth(token)
  382. }
  383. // 处理租户码
  384. tenant := request.GetTenant(r)
  385. metadata := map[string]string{"tenant": tenant}
  386. return context.WithValue(context.Background(), share.ReqMetaDataKey, metadata)
  387. }
  388. // 解析token,若无,返回空
  389. func getRequestToken(r *ghttp.Request) string {
  390. authHeader := r.Header.Get("Authorization")
  391. if authHeader != "" {
  392. parts := strings.SplitN(authHeader, " ", 2)
  393. if !(len(parts) == 2 && parts[0] == "Bearer") {
  394. return ""
  395. } else if parts[1] == "" {
  396. return ""
  397. }
  398. return parts[1]
  399. }
  400. return ""
  401. }
  402. func handError(err error) ([]byte, error) {
  403. resp := make(map[string]interface{})
  404. resp["code"] = gerror.Code(err).Code()
  405. resp["data"] = gerror.Code(err).Message()
  406. return json.Marshal(resp)
  407. }