micro_srv.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. package micro_srv
  2. import (
  3. "context"
  4. "dashoo.cn/opms_libary/dynamic"
  5. "dashoo.cn/opms_libary/gtoken"
  6. "dashoo.cn/opms_libary/mutipart"
  7. "dashoo.cn/opms_libary/request"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "github.com/rcrowley/go-metrics"
  12. "io/ioutil"
  13. "net"
  14. "os"
  15. "path"
  16. "reflect"
  17. "strconv"
  18. "strings"
  19. "time"
  20. "github.com/gogf/gf/encoding/gbase64"
  21. "github.com/gogf/gf/encoding/gjson"
  22. "github.com/gogf/gf/frame/g"
  23. "github.com/gogf/gf/net/ghttp"
  24. "github.com/gogf/gf/text/gstr"
  25. "github.com/smallnest/rpcx/client"
  26. "github.com/smallnest/rpcx/protocol"
  27. "github.com/smallnest/rpcx/server"
  28. "github.com/smallnest/rpcx/serverplugin"
  29. "github.com/smallnest/rpcx/share"
  30. "dashoo.cn/common_definition/auth"
  31. )
  32. var (
  33. fileTransfer = reflect.TypeOf((*mutipart.FileTransfer)(nil)).Elem()
  34. )
  35. // InitMicroSrvClient 获取微服务客户端,arg为可选参数,若有必须是两个,分别是:reg string, serverAddr string
  36. func InitMicroSrvClient(serviceName, key string, args ...string) (c client.XClient) {
  37. reg := g.Config().GetString("service_registry.registry")
  38. etcdAddr := g.Config().GetString("service_registry.server-addr")
  39. if len(args) == 2 {
  40. reg = args[0]
  41. etcdAddr = args[1]
  42. }
  43. config := g.Config().GetString(key)
  44. arr := strings.Split(config, ",")
  45. srvName := arr[0]
  46. if len(arr) == 2 { // 点对点 直连
  47. d, _ := client.NewPeer2PeerDiscovery("tcp@"+arr[1], "")
  48. c = client.NewXClient(serviceName, client.Failtry, client.RandomSelect, d, client.DefaultOption)
  49. return c
  50. } else {
  51. if reg == "consul" { // 服务发现使用consul
  52. //d, _ := etcd_client.NewEtcdV3Discovery(srvName, serviceName, []string{etcdAddr}, nil)
  53. d, _ := client.NewConsulDiscovery(srvName, serviceName, []string{etcdAddr}, nil)
  54. c = client.NewXClient(serviceName, client.Failover, client.RoundRobin, d, client.DefaultOption)
  55. return c
  56. }
  57. }
  58. return nil
  59. }
  60. func CreateAndInitService(basePath string) *server.Server {
  61. srvAddr := g.Config().GetString("setting.bind-addr")
  62. fileAddr := g.Config().GetString("setting.bind-mutipart-addr")
  63. etcdAddr := g.Config().GetString("service_registry.server-addr")
  64. s := server.NewServer()
  65. if fileAddr != "" {
  66. p := server.NewStreamService(fileAddr, streamHandler, nil, 1000)
  67. s.EnableStreamService(share.StreamServiceName, p)
  68. }
  69. advertiseAddr := srvAddr
  70. if g.Config().GetBool("setting.need-advertise-addr") {
  71. advertiseAddr = g.Config().GetString("setting.advertise-addr")
  72. }
  73. g.Log().Infof("服务启动, basePath: %v, MicorSrv: %v", basePath, srvAddr)
  74. reg := g.Config().GetString("service_registry.registry")
  75. //if reg == "etcd" {
  76. // addEtcdRegistryPlugin(s, basePath, advertiseAddr, etcdAddr)
  77. //}
  78. if reg == "consul" {
  79. addConsulRegistryPlugin(s, basePath, advertiseAddr, etcdAddr)
  80. }
  81. return s
  82. }
  83. func addConsulRegistryPlugin(s *server.Server, basePath, srvAddr, consulAddr string) {
  84. r := &serverplugin.ConsulRegisterPlugin{
  85. ServiceAddress: "tcp@" + srvAddr,
  86. ConsulServers: []string{consulAddr},
  87. BasePath: basePath,
  88. Metrics: metrics.NewRegistry(),
  89. UpdateInterval: time.Minute,
  90. }
  91. err := r.Start()
  92. if err != nil {
  93. g.Log().Fatal(err)
  94. }
  95. g.Log().Infof("注册到Consul: %v, basePath: %v, MicorSrv: %v", consulAddr, basePath, srvAddr)
  96. s.Plugins.Add(r)
  97. }
  98. func streamHandler(conn net.Conn, args *share.StreamServiceArgs) {
  99. defer conn.Close()
  100. fileName := args.Meta["file_name"]
  101. suffix := path.Ext(fileName)
  102. tmpFile, err := ioutil.TempFile(os.TempDir(), "multipart-*"+suffix)
  103. if err != nil {
  104. g.Log().Error(err)
  105. return
  106. }
  107. defer os.Remove(tmpFile.Name())
  108. fileSize := args.Meta["file_size"]
  109. size, _ := strconv.Atoi(fileSize)
  110. total := 0
  111. buf := make([]byte, 4096)
  112. for {
  113. n, _ := conn.Read(buf)
  114. total += n
  115. _, err = tmpFile.Write(buf)
  116. if err != nil {
  117. g.Log().Error(err)
  118. return
  119. }
  120. //如果实际总接受字节数与客户端给的要传输字节数相等,说明传输完毕
  121. if total == size {
  122. beanName, ok := args.Meta["bean_name"]
  123. if ok {
  124. result, err := handDynamicService(beanName, tmpFile, args.Meta)
  125. if err != nil {
  126. resp := make(map[string]interface{})
  127. resp["code"] = 500
  128. resp["data"] = err.Error()
  129. result, _ = json.Marshal(resp)
  130. }
  131. conn.Write(result)
  132. } else {
  133. es := errors.New(fmt.Sprintf("Service[%s] 没有实现HandleFileTransfer接口", args.Meta["bean_name"]))
  134. g.Log().Error(es)
  135. resp := make(map[string]interface{})
  136. resp["code"] = 500
  137. resp["data"] = es.Error()
  138. result, _ := json.Marshal(resp)
  139. conn.Write(result)
  140. }
  141. break
  142. }
  143. }
  144. conn.Close()
  145. }
  146. func handDynamicService(beanName string, file *os.File, param map[string]string) ([]byte, error) {
  147. bean := dynamic.BeanFactory.GetBean(beanName)
  148. if !bean.IsValid() {
  149. ce := errors.New(fmt.Sprintf("Service[%s] 没有注册", beanName))
  150. g.Log().Error(ce)
  151. return nil, ce
  152. }
  153. typ := bean.Type()
  154. if bean.CanInterface() && typ.Implements(fileTransfer) {
  155. service := bean.Interface().(mutipart.FileTransfer)
  156. resp, err := service.HandleFileTransfer(file, param)
  157. if err != nil {
  158. ce := errors.New(fmt.Sprintf("Service[%s] 文件处理异常", beanName))
  159. g.Log().Error(ce, err)
  160. return nil, ce
  161. } else {
  162. return json.Marshal(resp)
  163. }
  164. } else {
  165. ce := errors.New(fmt.Sprintf("Service[%s] 没有实现HandleFileTransfer接口", beanName))
  166. g.Log().Error(ce)
  167. return nil, ce
  168. }
  169. }
  170. // HandleAuth 处理Auth认证
  171. func HandleAuth(ctx context.Context, req *protocol.Message, token string, authExcludePaths []string) error {
  172. path := "/" + req.ServicePath + "/" + req.ServiceMethod
  173. //g.Log().Info("reqPath: ", path)
  174. //g.Log().Info("token: ", token)
  175. req.Metadata["authExclude"] = "true"
  176. if authPath(path, authExcludePaths) {
  177. req.Metadata["authExclude"] = "false"
  178. var rsp gtoken.Resp
  179. notAuthSrv := ctx.Value("NotAuthSrv")
  180. if notAuthSrv != nil && notAuthSrv.(bool) {
  181. rsp = gtoken.GFToken.ValidToken(token)
  182. } else {
  183. rsp = validToken(token)
  184. }
  185. //return errors.New("InvalidToken")
  186. if rsp.Code != 0 {
  187. return errors.New("InvalidToken")
  188. }
  189. //userInfo, err := getUserInfoFromToken(rsp)
  190. //if err!=nil{
  191. // return err
  192. //}
  193. //g.Dump(userInfo)
  194. if req.Metadata != nil {
  195. req.Metadata["userInfo"] = rsp.DataString()
  196. }
  197. return nil
  198. }
  199. return nil
  200. }
  201. // 判断路径是否需要进行认证拦截
  202. // return true 需要认证
  203. func authPath(urlPath string, authExcludePaths []string) bool {
  204. // 去除后斜杠
  205. if strings.HasSuffix(urlPath, "/") {
  206. urlPath = gstr.SubStr(urlPath, 0, len(urlPath)-1)
  207. }
  208. // 排除路径处理,到这里nextFlag为true
  209. for _, excludePath := range authExcludePaths {
  210. tmpPath := excludePath
  211. // 前缀匹配
  212. if strings.HasSuffix(tmpPath, "/*") {
  213. tmpPath = gstr.SubStr(tmpPath, 0, len(tmpPath)-2)
  214. if gstr.HasPrefix(urlPath, tmpPath) {
  215. // 前缀匹配不拦截
  216. return false
  217. }
  218. } else {
  219. // 全路径匹配
  220. if strings.HasSuffix(tmpPath, "/") {
  221. tmpPath = gstr.SubStr(tmpPath, 0, len(tmpPath)-1)
  222. }
  223. if urlPath == tmpPath {
  224. // 全路径匹配不拦截
  225. return false
  226. }
  227. }
  228. }
  229. return true
  230. }
  231. // 验证token
  232. func validToken(token string) gtoken.Resp {
  233. grsp := gtoken.Resp{}
  234. if token == "" {
  235. grsp.Code = 401
  236. grsp.Msg = "valid token empty"
  237. return grsp
  238. }
  239. authService := InitMicroSrvClient("Admin", "micro_srv.auth")
  240. defer authService.Close()
  241. rsp := &auth.Response{}
  242. err := authService.Call(context.TODO(), "ValidToken", token, rsp)
  243. if err != nil {
  244. g.Log().Error(err)
  245. grsp.Code = 401
  246. return grsp
  247. }
  248. grsp.Code = int(rsp.Code)
  249. grsp.Msg = rsp.Msg
  250. grsp.Data = rsp.Data
  251. return grsp
  252. }
  253. // IsAuthExclude 是否进行auth验证
  254. func IsAuthExclude(ctx context.Context) bool {
  255. reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
  256. flag, ok := reqMeta["authExclude"]
  257. if !ok || flag == "true" {
  258. return true
  259. }
  260. return false
  261. }
  262. // GetUserInfo 从context中获取UserInfo
  263. func GetUserInfo(ctx context.Context) (request.UserInfo, error) {
  264. reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
  265. userStr, ok := reqMeta["userInfo"]
  266. if !ok {
  267. return request.UserInfo{}, errors.New("不存在UserInfo数据")
  268. }
  269. userInfo, err := getUserInfoDataString(userStr)
  270. if err != nil {
  271. return request.UserInfo{}, err
  272. }
  273. return userInfo, nil
  274. }
  275. // GetTenant 从context中获取租户码
  276. func GetTenant(ctx context.Context) (string, error) {
  277. reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
  278. tenant, ok := reqMeta["tenant"]
  279. if !ok {
  280. return "", errors.New("不存在租户码")
  281. }
  282. return tenant, nil
  283. }
  284. // GetReqMethod 从context中获取请求方式
  285. func GetReqMethod(ctx context.Context) (string, error) {
  286. reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
  287. reqMethod, ok := reqMeta["reqMethod"]
  288. if !ok {
  289. return "", errors.New("获取请求方式异常")
  290. }
  291. return reqMethod, nil
  292. }
  293. // GetToken 从context中获取Token
  294. func GetToken(ctx context.Context) (string, error) {
  295. reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
  296. token, ok := reqMeta["__AUTH"]
  297. if !ok {
  298. return "", errors.New("token获取失败")
  299. }
  300. return token, nil
  301. }
  302. // GetBrowserInfo 从context中获取ClientIP和UserAgent
  303. func GetBrowserInfo(ctx context.Context) (clientIP string, userAgent string, err error) {
  304. reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
  305. clientIP, ok := reqMeta["clientIP"]
  306. if !ok {
  307. return "", "", errors.New("BrowserInfo获取失败")
  308. }
  309. userAgent, ok = reqMeta["userAgent"]
  310. if !ok {
  311. return "", "", errors.New("BrowserInfo获取失败")
  312. }
  313. userAgent, err = gbase64.DecodeToString(userAgent)
  314. return
  315. }
  316. // getUserInfoDataString 从userInfo字符串转换成对象
  317. func getUserInfoDataString(userInfoString string) (request.UserInfo, error) {
  318. var userInfo request.UserInfo
  319. //uuid := ""
  320. if j, err := gjson.DecodeToJson([]byte(userInfoString)); err != nil {
  321. g.Log().Error(err)
  322. return userInfo, err
  323. } else {
  324. j.SetViolenceCheck(true)
  325. err = j.GetStruct("data", &userInfo)
  326. if err != nil {
  327. g.Log().Error(err)
  328. return userInfo, err
  329. }
  330. }
  331. return userInfo, nil
  332. }
  333. // SetTenant 设置租户码(传统WebAPI调用使用)
  334. func SetTenant(tenant string) context.Context {
  335. metadata := map[string]string{"tenant": tenant}
  336. return context.WithValue(context.Background(), share.ReqMetaDataKey, metadata)
  337. }
  338. // SetTenantAndAuth 设置租户码和认证信息(传统WebAPI调用使用)
  339. func SetTenantAndAuth(r *ghttp.Request, client client.XClient) context.Context {
  340. // 处理Auth
  341. token := getRequestToken(r)
  342. if token != "" {
  343. client.Auth(token)
  344. }
  345. // 处理租户码
  346. tenant := request.GetTenant(r)
  347. metadata := map[string]string{"tenant": tenant}
  348. return context.WithValue(context.Background(), share.ReqMetaDataKey, metadata)
  349. }
  350. // 解析token,若无,返回空
  351. func getRequestToken(r *ghttp.Request) string {
  352. authHeader := r.Header.Get("Authorization")
  353. if authHeader != "" {
  354. parts := strings.SplitN(authHeader, " ", 2)
  355. if !(len(parts) == 2 && parts[0] == "Bearer") {
  356. return ""
  357. } else if parts[1] == "" {
  358. return ""
  359. }
  360. return parts[1]
  361. }
  362. return ""
  363. }