前段时间重构jenkins项目,将传统jenkins流水线重构成更直观的blueocean界面,突然发现一个基于vue开源的前端界面开源项目[vue-pipeline](https://github.com/jinfang134/vue-pipeline),用两周时间给它写了个后端(纯原创),怎奈何功能被ban掉了,so,所以把关键代码和实现逻辑输出成一篇文章记录一下 + JenkinsKeepalive方法实现效果是抢占式分布式锁,服务启动拿到的serviceOnlyId用来做唯一标识 + 服务注册上立马扫描redis现存未加锁的任务进行抢占 + 续租机制,每秒钟heartbeatTicker续租手中的lockKey,如果服务挂掉或超时,redis过期时间到了自然释放锁 + 代码 ```golang func (svc *TaskSvc) JenkinsKeepalive(serviceOnlyId string) error { ctx := svc.ctx ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() var lockKeys []string flagList := configs.GlobalConfig.Jenkins.InitStartList for _, item := range flagList { var cursor uint64 var lockKeysByFlag []string for { var keys []string var err error keys, cursor, err = redis.Rdb.Scan(ctx, cursor, constant.JENKINS_PREFIX+item+":"+"lock:"+"*", 10).Result() if err != nil { return err } lockKeysByFlag = append(lockKeysByFlag, keys...) if cursor == 0 { break } } lockKeys = append(lockKeys, lockKeysByFlag...) } var lockKeySet sync.Map for _, key := range lockKeys { lockKeySet.Store(key, struct{}{}) } keyChan := make(chan string) lockTicker := time.NewTicker(2 * time.Second) defer lockTicker.Stop() var wg sync.WaitGroup heartbeat := func(lockKey string, stopChan chan struct{}) { heartbeatTicker := time.NewTicker(1 * time.Second) defer heartbeatTicker.Stop() for { select { case <-heartbeatTicker.C: redis.Rdb.Expire(ctx, lockKey, 5*time.Second) case <-stopChan: return } } } for i := 0; i < configs.GlobalConfig.Scheduler.MaxWorker; i++ { wg.Add(1) go func() { defer wg.Done() for k := range keyChan { // k: jenkins:queue:jenkins-package:build_java:321 kSplitList := strings.Split(k, ":") if len(kSplitList) != 5 { logger.Errorf("Get redis key split length is faild! key=%s", k) continue } lockKey := kSplitList[0] + ":" + kSplitList[1] + ":" + kSplitList[2] + ":" + "lock:" + kSplitList[3] + ":" + kSplitList[4] if _, exists := lockKeySet.Load(lockKey); !exists { success, err := redis.Rdb.SetNX(ctx, lockKey, serviceOnlyId, 5*time.Second).Result() if err != nil || !success { continue } stopChan := make(chan struct{}) go heartbeat(lockKey, stopChan) time.Sleep(10 * time.Second) close(stopChan) redis.Rdb.Del(ctx, lockKey) } } }() } for { select { case <-ticker.C: var idleKeys []string for _, item := range flagList { var cursor uint64 var idleKeysByFlag []string for { var keys []string var err error keys, cursor, err = redis.Rdb.Scan(ctx, cursor, constant.JENKINS_PREFIX+item+":"+"*", 10).Result() if err != nil { return err } idleKeysByFlag = append(idleKeysByFlag, keys...) if cursor == 0 { break } } idleKeys = append(idleKeys, idleKeysByFlag...) } for _, key := range idleKeys { // key: jenkins:queue:jenkins-package:build_java:321 keySplitList := strings.Split(key, ":") if len(keySplitList) != 5 { continue } if _, exists := lockKeySet.Load(keySplitList[0] + ":" + keySplitList[1] + ":" + keySplitList[2] + ":" + "lock:" + keySplitList[3] + ":" + keySplitList[4]); !exists { keyChan <- key } } case <-lockTicker.C: lockKeySet.Range(func(key, value interface{}) bool { lockKeySet.Delete(key) return false }) case <-ctx.Done(): close(keyChan) return nil } } // 等待所有 goroutine 完成 wg.Wait() return nil } ``` + JenkinsConsumer主要来消费和抢占拿到锁的jenkins任务 ```golang func (svc *TaskSvc) JenkinsConsumer(serviceOnlyId string) error { ctx := svc.ctx flagList := configs.GlobalConfig.Jenkins.InitStartList ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: var lockKeys []string for _, item := range flagList { var cursor uint64 var lockKeysByFlag []string for { var keys []string var err error keys, cursor, err = redis.Rdb.Scan(ctx, cursor, constant.JENKINS_PREFIX+item+":"+"lock:"+"*", 10).Result() if err != nil { return err } lockKeysByFlag = append(lockKeysByFlag, keys...) if cursor == 0 { break } } lockKeys = append(lockKeys, lockKeysByFlag...) } for _, lockKey := range lockKeys { value, err := redis.Rdb.Get(ctx, lockKey).Result() if err != nil { if err == redis.Nil { continue } logger.Errorf("Failed to get value for lock key %s: %v", lockKey, err) continue } if value == serviceOnlyId { logger.Debugf("Lock key: %s already has the expected value: %s", lockKey, serviceOnlyId) handleKey := strings.ReplaceAll(lockKey, "lock:", "") err := svc.JenkinsScheduler(handleKey) if err != nil { logger.Errorf("JenkinsScheduler failed for handleKey %s! error=%v", handleKey, err) } continue } } case <-ctx.Done(): logger.Infof("Context cancelled, exiting JenkinsConsumer") return nil } } } ``` + 上面消费者中获取到抢占上的锁后MonitorCompletion查看状态是否更新 + 如果状态更新:写库,删掉redis jenkins任务记录 + 未更新:忽略 + JenkinsScheduler ```golang func (svc *TaskSvc) JenkinsScheduler(handleKey string) error { pipelineResult := make(chan string, 1) defer close(pipelineResult) // handleKey: jenkins:queue:jenkins-package:build_java:321 subKeyList := strings.Split(handleKey, ":") if len(subKeyList) != 5 { return eris.New("get redis handle key length is failed") } mode := subKeyList[2] pipeline := subKeyList[3] runId := subKeyList[4] eg := errgroup.Group{} eg.Go(func() error { var req jenkins.GetResultOption req.Pipelines = pipeline req.RunID = runId logger.Debugf("Starting MonitorCompletion for pipeline: %s, runId: %s", pipeline, runId) err := action.MonitorCompletion(pipelineResult, req, mode) if err != nil { logger.Errorf("MonitorCompletion failed: %v", err) return err } logger.Debugf("MonitorCompletion completed for pipeline: %s, runId: %s", pipeline, runId) return nil }) ctxWithTimeout, cancel := context.WithTimeout(svc.ctx, 30*time.Second) defer cancel() if err := eg.Wait(); err != nil { return err } select { case result := <-pipelineResult: var idMap sync.Map keys := []string{"serviceId", "deployId", "action"} hGetResp, err := redis.Rdb.HMGet(svc.ctx, handleKey, keys...).Result() if err != nil { return err } for i, value := range hGetResp { if value == nil { continue } idMap.Store(keys[i], value.(string)) } serviceId, _ := idMap.Load("serviceId") deployId, _ := idMap.Load("deployId") actionValue, _ := idMap.Load("action") if serviceId == nil || deployId == nil || actionValue == nil { return nil } if serviceId.(string) == "" || deployId.(string) == "" || actionValue.(string) == "" { return nil } err = svc.cDao.UpdateActionStatus(svc.ctx, dao.UpdateActionOption{ RunId: runId, DeployId: deployId.(string), ServiceId: serviceId.(string), Status: getResultStatus(result, actionValue.(string)), }, func(delRunId string, deployId string) error { status, err := svc.cDao.GetActionStatus(svc.ctx, deployId) if err != nil { logger.Debugf("Not generate deployments row, skip msg: =%v", err) return nil } if strings.HasSuffix(status, "ing") { return nil } redisKey := strings.Join(subKeyList[:4], ":") + ":" + delRunId redis.Rdb.Del(svc.ctx, redisKey) return nil }) if err != nil { return eris.Errorf("jenkins result update db failed! error=%v", err) } case <-ctxWithTimeout.Done(): return eris.New("timeout waiting for MonitorCompletion result") } return nil } func getResultStatus(result string, action string) string { if result == "SUCCESS" { if action == "package" { return "package_success" } } else { if action == "package" { return "package_failed" } } logger.Errorf("Unknown result: %s, action: %s", result, action) return "unknown" } ```