Jelajahi Sumber

[fix] remove spring bean circular dependence

tjq 4 tahun lalu
induk
melakukan
53a99241e0

+ 1 - 1
others/script/jenkins_auto_build.sh

@@ -25,7 +25,7 @@ docker run -d \
        --restart=always \
        --name powerjob-server \
        -p 7700:7700 -p 10086:10086 \
-       -e PARAMS="--spring.profiles.active=product --spring.datasource.core.jdbc-url=jdbc:mysql://127.0.0.1:3306/powerjob-product?useUnicode=true&characterEncoding=UTF-8 --spring.data.mongodb.uri=mongodb://127.0.0.1:27017/powerjob-product" \
+       -e PARAMS="--spring.profiles.active=product --spring.datasource.core.jdbc-url=jdbc:mysql://124.70.67.79:3306/powerjob-product?useUnicode=true&characterEncoding=UTF-8 --spring.data.mongodb.uri=mongodb://124.70.67.79:27017/powerjob-product" \
        -v ~/docker/powerjob-server:/root/powerjob-server -v ~/.m2:/root/.m2 \
        tjqq/powerjob-server:latest
 echo "================== powerjob-client 启动完成 =================="

+ 6 - 0
powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java

@@ -9,6 +9,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
 import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;
 import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService;
 import com.github.kfcfans.powerjob.server.service.instance.InstanceManager;
+import com.github.kfcfans.powerjob.server.service.instance.InstanceMetaInfoService;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -39,6 +40,8 @@ public class DispatchService {
     @Resource
     private InstanceManager instanceManager;
     @Resource
+    private InstanceMetaInfoService instanceMetaInfoService;
+    @Resource
     private InstanceInfoRepository instanceInfoRepository;
 
     private static final Splitter commaSplitter = Splitter.on(",");
@@ -142,5 +145,8 @@ public class DispatchService {
 
         // 修改状态
         instanceInfoRepository.update4TriggerSucceed(instanceId, WAITING_WORKER_RECEIVE.getV(), currentRunningTimes + 1, current, taskTrackerAddress, dbInstanceParams, now);
+
+        // 装载缓存
+        instanceMetaInfoService.loadJobInfo(instanceId, jobInfo);
     }
 }

+ 10 - 9
powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java

@@ -11,7 +11,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
 import com.github.kfcfans.powerjob.server.persistence.local.LocalInstanceLogDO;
 import com.github.kfcfans.powerjob.server.persistence.local.LocalInstanceLogRepository;
 import com.github.kfcfans.powerjob.server.persistence.mongodb.GridFsManager;
-import com.github.kfcfans.powerjob.server.service.instance.InstanceManager;
+import com.github.kfcfans.powerjob.server.service.instance.InstanceMetaInfoService;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -46,7 +46,7 @@ import java.util.stream.Stream;
 public class InstanceLogService {
 
     @Resource
-    private InstanceManager instanceManager;
+    private InstanceMetaInfoService instanceMetaInfoService;
     @Resource
     private GridFsManager gridFsManager;
     // 本地数据库操作bean
@@ -317,16 +317,17 @@ public class InstanceLogService {
     @Scheduled(fixedDelay = 60000)
     public void timingCheck() {
 
+        // TODO: 检查 lastReportTime,过期 instance 调用 sync 同步并删除
+
         // 定时删除秒级任务的日志
         List<Long> frequentInstanceIds = Lists.newLinkedList();
         instanceId2LastReportTime.keySet().forEach(instanceId -> {
-            JobInfoDO jobInfo = instanceManager.fetchJobInfo(instanceId);
-            if (jobInfo == null) {
-                return;
-            }
-
-            if (TimeExpressionType.frequentTypes.contains(jobInfo.getTimeExpressionType())) {
-                frequentInstanceIds.add(instanceId);
+            try {
+                JobInfoDO jobInfo = instanceMetaInfoService.fetchJobInfoByInstanceId(instanceId);
+                if (TimeExpressionType.frequentTypes.contains(jobInfo.getTimeExpressionType())) {
+                    frequentInstanceIds.add(instanceId);
+                }
+            }catch (Exception ignore) {
             }
         });
 

+ 8 - 53
powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java

@@ -8,7 +8,6 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO;
 import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
 import com.github.kfcfans.powerjob.server.persistence.core.model.UserInfoDO;
 import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;
-import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository;
 import com.github.kfcfans.powerjob.server.service.DispatchService;
 import com.github.kfcfans.powerjob.server.service.InstanceLogService;
 import com.github.kfcfans.powerjob.server.service.UserService;
@@ -16,8 +15,6 @@ import com.github.kfcfans.powerjob.server.service.alarm.Alarmable;
 import com.github.kfcfans.powerjob.server.service.alarm.JobInstanceAlarmContent;
 import com.github.kfcfans.powerjob.server.service.timing.schedule.HashedWheelTimerHolder;
 import com.github.kfcfans.powerjob.server.service.workflow.WorkflowInstanceManager;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.BeanUtils;
 import org.springframework.stereotype.Service;
@@ -25,7 +22,6 @@ import org.springframework.stereotype.Service;
 import javax.annotation.Resource;
 import java.util.Date;
 import java.util.List;
-import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -38,10 +34,6 @@ import java.util.concurrent.TimeUnit;
 @Service
 public class InstanceManager {
 
-    // 存储 instanceId 对应的 Job 信息,便于重试
-    private static Cache<Long, JobInfoDO> instanceId2JobInfo;
-
-    // Spring Bean
     @Resource
     private DispatchService dispatchService;
     @Resource
@@ -49,21 +41,12 @@ public class InstanceManager {
     @Resource(name = "omsCenterAlarmService")
     private Alarmable omsCenterAlarmService;
     @Resource
-    private InstanceInfoRepository instanceInfoRepository;
+    private InstanceMetaInfoService instanceMetaInfoService;
     @Resource
-    private JobInfoRepository jobInfoRepository;
+    private InstanceInfoRepository instanceInfoRepository;
     @Resource
     private WorkflowInstanceManager workflowInstanceManager;
 
-    private static final int CACHE_CONCURRENCY_LEVEL = 8;
-    private static final int CACHE_MAX_SIZE = 4096;
-
-    static {
-        instanceId2JobInfo = CacheBuilder.newBuilder()
-                .concurrencyLevel(CACHE_CONCURRENCY_LEVEL)
-                .maximumSize(CACHE_MAX_SIZE)
-                .build();
-    }
 
     /**
      * 更新任务状态
@@ -71,14 +54,10 @@ public class InstanceManager {
      */
     public void updateStatus(TaskTrackerReportInstanceStatusReq req) throws Exception {
 
-        Long jobId = req.getJobId();
         Long instanceId = req.getInstanceId();
 
         // 获取相关数据
-        JobInfoDO jobInfo = instanceId2JobInfo.get(instanceId, () -> {
-            Optional<JobInfoDO> jobInfoOpt = jobInfoRepository.findById(jobId);
-            return jobInfoOpt.orElseThrow(() -> new IllegalArgumentException("can't find JobIno by jobId: " + jobId));
-        });
+        JobInfoDO jobInfo = instanceMetaInfoService.fetchJobInfoByInstanceId(req.getInstanceId());
         InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
         if (instanceInfo == null) {
             log.warn("[InstanceManager-{}] can't find InstanceInfo from database", instanceId);
@@ -173,9 +152,11 @@ public class InstanceManager {
 
         // 告警
         if (status == InstanceStatus.FAILED) {
-            JobInfoDO jobInfo = fetchJobInfo(instanceId);
-            if (jobInfo == null) {
-                log.warn("[InstanceManager] can't find jobInfo by instanceId({}), alarm failed.", instanceId);
+            JobInfoDO jobInfo;
+            try {
+                jobInfo = instanceMetaInfoService.fetchJobInfoByInstanceId(instanceId);
+            }catch (Exception e) {
+                log.warn("[InstanceManager-{}] can't find jobInfo, alarm failed.", instanceId);
                 return;
             }
 
@@ -187,32 +168,6 @@ public class InstanceManager {
             List<UserInfoDO> userList = SpringUtils.getBean(UserService.class).fetchNotifyUserList(jobInfo.getNotifyUserIds());
             omsCenterAlarmService.onJobInstanceFailed(content, userList);
         }
-
-        // 过期缓存
-        instanceId2JobInfo.invalidate(instanceId);
     }
 
-    /**
-     * 根据任务实例ID查询任务相关信息
-     * @param instanceId 任务实例ID
-     * @return 任务元数据
-     */
-    public JobInfoDO fetchJobInfo(Long instanceId) {
-        JobInfoDO jobInfo = instanceId2JobInfo.getIfPresent(instanceId);
-        if (jobInfo != null) {
-            return jobInfo;
-        }
-        InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
-        if (instanceInfo != null) {
-            return jobInfoRepository.findById(instanceInfo.getJobId()).orElse(null);
-        }
-        return null;
-    }
-
-    /**
-     * 释放本地缓存,防止内存泄漏
-     */
-    public static void releaseCache() {
-        instanceId2JobInfo.cleanUp();
-    }
 }

+ 67 - 0
powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceMetaInfoService.java

@@ -0,0 +1,67 @@
+package com.github.kfcfans.powerjob.server.service.instance;
+
+import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO;
+import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
+import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;
+import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * 存储 instance 对应的 JobInfo 信息
+ *
+ * @author tjq
+ * @since 2020/6/23
+ */
+@Service
+public class InstanceMetaInfoService {
+
+    @Resource
+    private JobInfoRepository jobInfoRepository;
+    @Resource
+    private InstanceInfoRepository instanceInfoRepository;
+
+    // 缓存,一旦生成任务实例,其对应的 JobInfo 不应该再改变(即使源数据改变)
+    private Cache<Long, JobInfoDO> instanceId2JobInfoCache;
+
+    private static final int CACHE_CONCURRENCY_LEVEL = 8;
+    private static final int CACHE_MAX_SIZE = 4096;
+
+    public InstanceMetaInfoService() {
+        instanceId2JobInfoCache = CacheBuilder.newBuilder()
+                .concurrencyLevel(CACHE_CONCURRENCY_LEVEL)
+                .maximumSize(CACHE_MAX_SIZE)
+                .build();
+    }
+
+    /**
+     * 根据 instanceId 获取 JobInfo
+     * @param instanceId instanceId
+     * @return JobInfoDO
+     * @throws ExecutionException 异常
+     */
+    public JobInfoDO fetchJobInfoByInstanceId(Long instanceId) throws ExecutionException {
+        return instanceId2JobInfoCache.get(instanceId, () -> {
+            InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
+            if (instanceInfo != null) {
+                Optional<JobInfoDO> jobInfoOpt = jobInfoRepository.findById(instanceInfo.getJobId());
+                return jobInfoOpt.orElseThrow(() -> new IllegalArgumentException("can't find JobInfo by jobId: " + instanceInfo.getJobId()));
+            }
+            throw new IllegalArgumentException("can't find Instance by instanceId: " + instanceId);
+        });
+    }
+
+    /**
+     * 装载缓存
+     * @param instanceId instanceId
+     * @param jobInfoDO 原始的任务数据
+     */
+    public void loadJobInfo(Long instanceId, JobInfoDO jobInfoDO) {
+        instanceId2JobInfoCache.put(instanceId, jobInfoDO);
+    }
+}

+ 0 - 1
powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java

@@ -60,7 +60,6 @@ public class CleanService {
 
         // 释放本地缓存
         WorkerManagerService.releaseContainerInfos();
-        InstanceManager.releaseCache();
 
         // 删除数据库运行记录
         cleanInstanceLog();