micro_srv.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. package micro_srv
  2. import (
  3. "context"
  4. "dashoo.cn/opms_libary/dynamic"
  5. "dashoo.cn/opms_libary/multipart"
  6. "encoding/json"
  7. "errors"
  8. "github.com/gogf/gf/util/gconv"
  9. "io/ioutil"
  10. "net"
  11. "os"
  12. "path"
  13. "strconv"
  14. "strings"
  15. "time"
  16. "dashoo.cn/common_definition/comm_def"
  17. "dashoo.cn/opms_libary/gtoken"
  18. "dashoo.cn/opms_libary/myerrors"
  19. "dashoo.cn/opms_libary/request"
  20. "github.com/gogf/gf/encoding/gbase64"
  21. "github.com/gogf/gf/encoding/gjson"
  22. "github.com/gogf/gf/errors/gerror"
  23. "github.com/gogf/gf/frame/g"
  24. "github.com/gogf/gf/net/ghttp"
  25. "github.com/gogf/gf/text/gstr"
  26. "github.com/rcrowley/go-metrics"
  27. consulclient "github.com/rpcxio/rpcx-consul/client"
  28. "github.com/rpcxio/rpcx-consul/serverplugin"
  29. "github.com/smallnest/rpcx/client"
  30. "github.com/smallnest/rpcx/protocol"
  31. "github.com/smallnest/rpcx/server"
  32. "github.com/smallnest/rpcx/share"
  33. )
  34. // InitMicroSrvClient 获取微服务客户端,arg为可选参数,若有必须是两个,分别是:reg string, serverAddr string
  35. func InitMicroSrvClient(serviceName, key string, args ...string) (c client.XClient) {
  36. reg := g.Config().GetString("service_registry.registry")
  37. etcdAddr := g.Config().GetString("service_registry.server-addr")
  38. if len(args) == 2 {
  39. reg = args[0]
  40. etcdAddr = args[1]
  41. }
  42. config := g.Config().GetString(key)
  43. arr := strings.Split(config, ",")
  44. srvName := arr[0]
  45. if len(arr) == 2 { // 点对点 直连
  46. d, _ := client.NewPeer2PeerDiscovery("tcp@"+arr[1], "")
  47. c = client.NewXClient(serviceName, client.Failtry, client.RandomSelect, d, client.DefaultOption)
  48. return c
  49. } else {
  50. if reg == "consul" { // 服务发现使用consul
  51. //d, _ := etcd_client.NewEtcdV3Discovery(srvName, serviceName, []string{etcdAddr}, nil)
  52. d, _ := consulclient.NewConsulDiscovery(srvName, serviceName, []string{etcdAddr}, nil)
  53. //d, _ := client.NewConsulDiscovery(srvName, serviceName, []string{etcdAddr}, nil)
  54. c = client.NewXClient(serviceName, client.Failover, client.RoundRobin, d, client.DefaultOption)
  55. return c
  56. }
  57. }
  58. return nil
  59. }
  60. func CreateAndInitService(basePath string) *server.Server {
  61. srvAddr := g.Config().GetString("setting.bind-addr")
  62. fileAddr := g.Config().GetString("setting.bind-mutipart-addr")
  63. etcdAddr := g.Config().GetString("service_registry.server-addr")
  64. s := server.NewServer()
  65. advertiseAddr := srvAddr
  66. if g.Config().GetBool("setting.need-advertise-addr") {
  67. advertiseAddr = g.Config().GetString("setting.advertise-addr")
  68. }
  69. g.Log().Infof("服务启动, basePath: %v, MicorSrv: %v", basePath, srvAddr)
  70. reg := g.Config().GetString("service_registry.registry")
  71. //if reg == "etcd" {
  72. // addEtcdRegistryPlugin(s, basePath, advertiseAddr, etcdAddr)
  73. //}
  74. if reg == "consul" {
  75. addConsulRegistryPlugin(s, basePath, advertiseAddr, etcdAddr)
  76. }
  77. if fileAddr != "" {
  78. p := server.NewStreamService(fileAddr, streamHandler, nil, 1000)
  79. s.EnableStreamService(share.StreamServiceName, p)
  80. }
  81. return s
  82. }
  83. func addConsulRegistryPlugin(s *server.Server, basePath, srvAddr, consulAddr string) {
  84. r := &serverplugin.ConsulRegisterPlugin{
  85. ServiceAddress: "tcp@" + srvAddr,
  86. ConsulServers: []string{consulAddr},
  87. BasePath: basePath,
  88. Metrics: metrics.NewRegistry(),
  89. UpdateInterval: time.Minute,
  90. }
  91. err := r.Start()
  92. if err != nil {
  93. g.Log().Fatal(err)
  94. }
  95. g.Log().Infof("注册到Consul: %v, basePath: %v, MicorSrv: %v", consulAddr, basePath, srvAddr)
  96. s.Plugins.Add(r)
  97. }
  98. func streamHandler(conn net.Conn, args *share.StreamServiceArgs) {
  99. defer conn.Close()
  100. ctx := context.Background()
  101. ctx = context.WithValue(ctx, share.ReqMetaDataKey, args.Meta)
  102. token, err := GetToken(ctx)
  103. if err != nil {
  104. result, _ := handError(err)
  105. conn.Write(result)
  106. return
  107. }
  108. resp := validToken(ctx, token)
  109. if resp.Code != 0 {
  110. result, _ := handError(myerrors.AuthError())
  111. conn.Write(result)
  112. return
  113. }
  114. args.Meta["userInfo"] = resp.DataString()
  115. ctx = context.WithValue(ctx, share.ReqMetaDataKey, args.Meta)
  116. var array = []*multipart.MultipartFile{}
  117. for {
  118. // 读取文件名长度
  119. buf := make([]byte, 3)
  120. conn.Read(buf)
  121. length, _ := strconv.Atoi(strings.TrimSpace(string(buf)))
  122. // 读取文件名
  123. fileHeader := make([]byte, length)
  124. _, err = conn.Read(fileHeader)
  125. headers := strings.Split(string(fileHeader), " ")
  126. paramName := headers[0]
  127. fileName := headers[1]
  128. fileSize := headers[2]
  129. //获取文件后缀
  130. suffix := path.Ext(fileName)
  131. tmpFile, err := ioutil.TempFile(os.TempDir(), "multipart-*"+suffix)
  132. if err != nil {
  133. g.Log().Error(err)
  134. return
  135. }
  136. defer os.Remove(tmpFile.Name())
  137. size, _ := strconv.Atoi(fileSize)
  138. buf = make([]byte, size)
  139. conn.Read(buf)
  140. tmpFile.Write(buf)
  141. file := &multipart.MultipartFile{FileName: paramName, FileSize: gconv.Int64(fileSize), File: tmpFile, Meta: args.Meta}
  142. _ = append(array, file)
  143. // 判断是否结束
  144. isEnd := make([]byte, 1)
  145. _, err = conn.Read(isEnd)
  146. if err != nil || isEnd[0] == '1' {
  147. continue
  148. }
  149. if isEnd[0] == '2' {
  150. break
  151. }
  152. }
  153. result := make([]byte, 0)
  154. className, _ := args.Meta["reqService"]
  155. methodName, _ := args.Meta["reqMethod"]
  156. message := new(dynamic.Message)
  157. message.ClassName = className
  158. message.MethodName = methodName
  159. message.Payload = array
  160. rsp, err := dynamic.Invoker.HandleInvoker(ctx, message)
  161. if err != nil {
  162. resp := make(map[string]interface{})
  163. resp["code"] = 500
  164. resp["data"] = err.Error()
  165. result, _ = json.Marshal(resp)
  166. } else {
  167. result, _ = json.Marshal(rsp.Payload)
  168. }
  169. conn.Write(result)
  170. conn.Close()
  171. }
  172. func getTenant(msg *protocol.Message) string {
  173. var tenant string
  174. if msg.Metadata != nil {
  175. tenant = msg.Metadata["tenant"]
  176. }
  177. return tenant
  178. }
  179. // HandleAuth 处理Auth认证
  180. func HandleAuth(ctx context.Context, req *protocol.Message, token string, authExcludePaths []string) error {
  181. reqPath := "/" + req.ServicePath + "/" + req.ServiceMethod
  182. tenant := getTenant(req)
  183. g.Log().Info("Received " + reqPath + " request @ " + tenant)
  184. if authPath(reqPath, authExcludePaths) {
  185. req.Metadata["authExclude"] = "false"
  186. var rsp gtoken.Resp
  187. notAuthSrv := ctx.Value("NotAuthSrv")
  188. if notAuthSrv != nil && notAuthSrv.(bool) {
  189. rsp = gtoken.GFToken.ValidToken(token)
  190. } else {
  191. ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{"tenant": tenant})
  192. rsp = validToken(ctx, token)
  193. }
  194. if rsp.Code != 0 && rsp.Code != 200 {
  195. return myerrors.AuthError()
  196. }
  197. if req.Metadata != nil {
  198. req.Metadata["userInfo"] = rsp.DataString()
  199. }
  200. return nil
  201. }
  202. return nil
  203. }
  204. // 判断路径是否需要进行认证拦截
  205. // return true 需要认证
  206. func authPath(urlPath string, authExcludePaths []string) bool {
  207. // 去除后斜杠
  208. if strings.HasSuffix(urlPath, "/") {
  209. urlPath = gstr.SubStr(urlPath, 0, len(urlPath)-1)
  210. }
  211. // 排除路径处理,到这里nextFlag为true
  212. for _, excludePath := range authExcludePaths {
  213. tmpPath := excludePath
  214. // 前缀匹配
  215. if strings.HasSuffix(tmpPath, "/*") {
  216. tmpPath = gstr.SubStr(tmpPath, 0, len(tmpPath)-2)
  217. if gstr.HasPrefix(urlPath, tmpPath) {
  218. // 前缀匹配不拦截
  219. return false
  220. }
  221. } else {
  222. // 全路径匹配
  223. if strings.HasSuffix(tmpPath, "/") {
  224. tmpPath = gstr.SubStr(tmpPath, 0, len(tmpPath)-1)
  225. }
  226. if urlPath == tmpPath {
  227. // 全路径匹配不拦截
  228. return false
  229. }
  230. }
  231. }
  232. return true
  233. }
  234. // 验证token
  235. func validToken(ctx context.Context, token string) gtoken.Resp {
  236. grsp := gtoken.Resp{}
  237. if token == "" {
  238. grsp.Code = 401
  239. grsp.Msg = "valid token empty"
  240. return grsp
  241. }
  242. authService := InitMicroSrvClient("Auth", "micro_srv.auth")
  243. defer authService.Close()
  244. rsp := &comm_def.CommonMsg{}
  245. err := authService.Call(ctx, "ValidToken", token, rsp)
  246. if err != nil {
  247. g.Log().Error(err)
  248. grsp.Code = 401
  249. return grsp
  250. }
  251. grsp.Code = int(rsp.Code)
  252. grsp.Msg = rsp.Msg
  253. grsp.Data = rsp.Data
  254. return grsp
  255. }
  256. // IsAuthExclude 是否进行auth验证
  257. func IsAuthExclude(ctx context.Context) bool {
  258. reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
  259. flag, ok := reqMeta["authExclude"]
  260. if !ok || flag == "true" {
  261. return true
  262. }
  263. return false
  264. }
  265. // GetUserInfo 从context中获取UserInfo
  266. func GetUserInfo(ctx context.Context) (request.UserInfo, error) {
  267. reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
  268. userStr, ok := reqMeta["userInfo"]
  269. if !ok {
  270. return request.UserInfo{}, errors.New("用户信息获取失败,请重新登录。")
  271. }
  272. userInfo, err := getUserInfoDataString(userStr)
  273. if err != nil {
  274. return request.UserInfo{}, errors.New("用户信息解码失败。")
  275. }
  276. return userInfo, nil
  277. }
  278. // GetTenant 从context中获取租户码
  279. func GetTenant(ctx context.Context) (string, error) {
  280. reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
  281. tenant, ok := reqMeta["tenant"]
  282. if !ok {
  283. return "", errors.New("不存在租户码")
  284. }
  285. return tenant, nil
  286. }
  287. // GetReqMethod 从context中获取请求方式
  288. func GetReqMethod(ctx context.Context) (string, error) {
  289. reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
  290. reqMethod, ok := reqMeta["reqMethod"]
  291. if !ok {
  292. return "", errors.New("获取请求方式异常")
  293. }
  294. return reqMethod, nil
  295. }
  296. // GetToken 从context中获取Token
  297. func GetToken(ctx context.Context) (string, error) {
  298. reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
  299. token, ok := reqMeta["__AUTH"]
  300. if !ok {
  301. return "", errors.New("token获取失败")
  302. }
  303. return token, nil
  304. }
  305. // GetBrowserInfo 从context中获取ClientIP和UserAgent
  306. func GetBrowserInfo(ctx context.Context) (clientIP string, userAgent string, err error) {
  307. reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
  308. clientIP, ok := reqMeta["clientIP"]
  309. if !ok {
  310. return "", "", errors.New("BrowserInfo获取失败")
  311. }
  312. userAgent, ok = reqMeta["userAgent"]
  313. if !ok {
  314. return "", "", errors.New("BrowserInfo获取失败")
  315. }
  316. userAgent, err = gbase64.DecodeToString(userAgent)
  317. return
  318. }
  319. // getUserInfoDataString 从userInfo字符串转换成对象
  320. func getUserInfoDataString(userInfoString string) (request.UserInfo, error) {
  321. var userInfo request.UserInfo
  322. //uuid := ""
  323. if j, err := gjson.DecodeToJson([]byte(userInfoString)); err != nil {
  324. g.Log().Error(err)
  325. return userInfo, err
  326. } else {
  327. j.SetViolenceCheck(true)
  328. err = j.GetStruct("data", &userInfo)
  329. if err != nil {
  330. g.Log().Error(err)
  331. return userInfo, err
  332. }
  333. }
  334. return userInfo, nil
  335. }
  336. // SetTenant 设置租户码(传统WebAPI调用使用)
  337. func SetTenant(tenant string) context.Context {
  338. metadata := map[string]string{"tenant": tenant}
  339. return context.WithValue(context.Background(), share.ReqMetaDataKey, metadata)
  340. }
  341. // SetTenantAndAuth 设置租户码和认证信息(传统WebAPI调用使用)
  342. func SetTenantAndAuth(r *ghttp.Request, client client.XClient) context.Context {
  343. // 处理Auth
  344. token := getRequestToken(r)
  345. if token != "" {
  346. client.Auth(token)
  347. }
  348. // 处理租户码
  349. tenant := request.GetTenant(r)
  350. metadata := map[string]string{"tenant": tenant}
  351. return context.WithValue(context.Background(), share.ReqMetaDataKey, metadata)
  352. }
  353. // 解析token,若无,返回空
  354. func getRequestToken(r *ghttp.Request) string {
  355. authHeader := r.Header.Get("Authorization")
  356. if authHeader != "" {
  357. parts := strings.SplitN(authHeader, " ", 2)
  358. if !(len(parts) == 2 && parts[0] == "Bearer") {
  359. return ""
  360. } else if parts[1] == "" {
  361. return ""
  362. }
  363. return parts[1]
  364. }
  365. return ""
  366. }
  367. func handError(err error) ([]byte, error) {
  368. resp := make(map[string]interface{})
  369. resp["code"] = gerror.Code(err).Code()
  370. resp["data"] = gerror.Code(err).Message()
  371. return json.Marshal(resp)
  372. }