QuartzExecutors.java 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one or more
  3. * contributor license agreements. See the NOTICE file distributed with
  4. * this work for additional information regarding copyright ownership.
  5. * The ASF licenses this file to You under the Apache License, Version 2.0
  6. * (the "License"); you may not use this file except in compliance with
  7. * the License. You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package cn.escheduler.api.quartz;
  18. import cn.escheduler.common.Constants;
  19. import cn.escheduler.common.utils.JSONUtils;
  20. import cn.escheduler.dao.model.Schedule;
  21. import org.apache.commons.lang.StringUtils;
  22. import org.quartz.*;
  23. import org.quartz.impl.StdSchedulerFactory;
  24. import org.quartz.impl.matchers.GroupMatcher;
  25. import org.slf4j.Logger;
  26. import org.slf4j.LoggerFactory;
  27. import java.util.*;
  28. import java.util.Calendar;
  29. import java.util.concurrent.locks.ReadWriteLock;
  30. import java.util.concurrent.locks.ReentrantReadWriteLock;
  31. import static org.quartz.CronScheduleBuilder.cronSchedule;
  32. import static org.quartz.JobBuilder.newJob;
  33. import static org.quartz.TriggerBuilder.newTrigger;
  34. /**
  35. * single Quartz executors instance
  36. */
  37. public class QuartzExecutors {
  38. private static final Logger logger = LoggerFactory.getLogger(QuartzExecutors.class);
  39. private final ReadWriteLock lock = new ReentrantReadWriteLock();
  40. /**
  41. * <p>
  42. * A <code>Scheduler</code> maintains a registry of <code>{@link org.quartz.JobDetail}</code>s
  43. * and <code>{@link Trigger}</code>s. Once registered, the <code>Scheduler</code>
  44. * is responsible for executing <code>Job</code> s when their associated
  45. * <code>Trigger</code> s fire (when their scheduled time arrives).
  46. * </p>
  47. * {@link Scheduler}
  48. */
  49. private static Scheduler scheduler;
  50. private static volatile QuartzExecutors INSTANCE = null;
  51. private QuartzExecutors() {}
  52. /**
  53. * thread safe and performance promote
  54. * @return
  55. */
  56. public static QuartzExecutors getInstance() {
  57. if (INSTANCE == null) {
  58. synchronized (QuartzExecutors.class) {
  59. // when more than two threads run into the first null check same time, to avoid instanced more than one time, it needs to be checked again.
  60. if (INSTANCE == null) {
  61. INSTANCE = new QuartzExecutors();
  62. //finish QuartzExecutors init
  63. INSTANCE.init();
  64. }
  65. }
  66. }
  67. return INSTANCE;
  68. }
  69. /**
  70. * init
  71. *
  72. * <p>
  73. * Returns a client-usable handle to a <code>Scheduler</code>.
  74. * </p>
  75. */
  76. private void init() {
  77. try {
  78. SchedulerFactory schedulerFactory = new StdSchedulerFactory(Constants.QUARTZ_PROPERTIES_PATH);
  79. scheduler = schedulerFactory.getScheduler();
  80. } catch (SchedulerException e) {
  81. logger.error(e.getMessage(),e);
  82. System.exit(1);
  83. }
  84. }
  85. /**
  86. * Whether the scheduler has been started.
  87. *
  88. * <p>
  89. * Note: This only reflects whether <code>{@link #start()}</code> has ever
  90. * been called on this Scheduler, so it will return <code>true</code> even
  91. * if the <code>Scheduler</code> is currently in standby mode or has been
  92. * since shutdown.
  93. * </p>
  94. *
  95. * @see Scheduler#start()
  96. */
  97. public void start() throws SchedulerException {
  98. if (!scheduler.isStarted()){
  99. scheduler.start();
  100. logger.info("Quartz service started" );
  101. }
  102. }
  103. /**
  104. * stop all scheduled tasks
  105. *
  106. * Halts the <code>Scheduler</code>'s firing of <code>{@link Trigger}s</code>,
  107. * and cleans up all resources associated with the Scheduler. Equivalent to
  108. * <code>shutdown(false)</code>.
  109. *
  110. * <p>
  111. * The scheduler cannot be re-started.
  112. * </p>
  113. *
  114. */
  115. public void shutdown() throws SchedulerException {
  116. if (!scheduler.isShutdown()) {
  117. // don't wait for the task to complete
  118. scheduler.shutdown();
  119. logger.info("Quartz service stopped, and halt all tasks");
  120. }
  121. }
  122. /**
  123. * add task trigger , if this task already exists, return this task with updated trigger
  124. *
  125. * @param clazz job class name
  126. * @param jobName job name
  127. * @param jobGroupName job group name
  128. * @param startDate job start date
  129. * @param endDate job end date
  130. * @param cronExpression cron expression
  131. * @param jobDataMap job parameters data map
  132. * @return
  133. */
  134. public void addJob(Class<? extends Job> clazz,String jobName,String jobGroupName,Date startDate, Date endDate,
  135. String cronExpression,
  136. Map<String, Object> jobDataMap) {
  137. lock.writeLock().lock();
  138. try {
  139. JobKey jobKey = new JobKey(jobName, jobGroupName);
  140. JobDetail jobDetail;
  141. //add a task (if this task already exists, return this task directly)
  142. if (scheduler.checkExists(jobKey)) {
  143. jobDetail = scheduler.getJobDetail(jobKey);
  144. if (jobDataMap != null) {
  145. jobDetail.getJobDataMap().putAll(jobDataMap);
  146. }
  147. } else {
  148. jobDetail = newJob(clazz).withIdentity(jobKey).build();
  149. if (jobDataMap != null) {
  150. jobDetail.getJobDataMap().putAll(jobDataMap);
  151. }
  152. scheduler.addJob(jobDetail, false, true);
  153. logger.info("Add job, job name: {}, group name: {}",
  154. jobName, jobGroupName);
  155. }
  156. TriggerKey triggerKey = new TriggerKey(jobName, jobGroupName);
  157. /**
  158. * Instructs the <code>{@link Scheduler}</code> that upon a mis-fire
  159. * situation, the <code>{@link CronTrigger}</code> wants to have it's
  160. * next-fire-time updated to the next time in the schedule after the
  161. * current time (taking into account any associated <code>{@link Calendar}</code>,
  162. * but it does not want to be fired now.
  163. */
  164. CronTrigger cronTrigger = newTrigger().withIdentity(triggerKey).startAt(startDate).endAt(endDate)
  165. .withSchedule(cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing())
  166. .forJob(jobDetail).build();
  167. if (scheduler.checkExists(triggerKey)) {
  168. // updateProcessInstance scheduler trigger when scheduler cycle changes
  169. CronTrigger oldCronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey);
  170. String oldCronExpression = oldCronTrigger.getCronExpression();
  171. if (!StringUtils.equalsIgnoreCase(cronExpression,oldCronExpression)) {
  172. // reschedule job trigger
  173. scheduler.rescheduleJob(triggerKey, cronTrigger);
  174. logger.info("reschedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",
  175. jobName, jobGroupName, cronExpression, startDate, endDate);
  176. }
  177. } else {
  178. scheduler.scheduleJob(cronTrigger);
  179. logger.info("schedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",
  180. jobName, jobGroupName, cronExpression, startDate, endDate);
  181. }
  182. } catch (Exception e) {
  183. logger.error("add job failed", e);
  184. throw new RuntimeException("add job failed:"+e.getMessage());
  185. } finally {
  186. lock.writeLock().unlock();
  187. }
  188. }
  189. /**
  190. * delete job
  191. *
  192. * @param jobName
  193. * @param jobGroupName
  194. * @return true if the Job was found and deleted.
  195. */
  196. public boolean deleteJob(String jobName, String jobGroupName) {
  197. lock.writeLock().lock();
  198. try {
  199. JobKey jobKey = new JobKey(jobName,jobGroupName);
  200. if(scheduler.checkExists(jobKey)){
  201. logger.info("try to delete job, job name: {}, job group name: {},", jobName, jobGroupName);
  202. return scheduler.deleteJob(jobKey);
  203. }else {
  204. return true;
  205. }
  206. } catch (SchedulerException e) {
  207. logger.error(String.format("delete job : %s failed",jobName), e);
  208. } finally {
  209. lock.writeLock().unlock();
  210. }
  211. return false;
  212. }
  213. /**
  214. * delete all jobs in job group
  215. * <p>
  216. * Note that while this bulk operation is likely more efficient than
  217. * invoking <code>deleteJob(JobKey jobKey)</code> several
  218. * times, it may have the adverse affect of holding data locks for a
  219. * single long duration of time (rather than lots of small durations
  220. * of time).
  221. * </p>
  222. *
  223. * @param jobGroupName
  224. *
  225. * @return true if all of the Jobs were found and deleted, false if
  226. * one or more were not deleted.
  227. */
  228. public boolean deleteAllJobs(String jobGroupName) {
  229. lock.writeLock().lock();
  230. try {
  231. logger.info("try to delete all jobs in job group: {}", jobGroupName);
  232. List<JobKey> jobKeys = new ArrayList<>();
  233. jobKeys.addAll(scheduler.getJobKeys(GroupMatcher.groupEndsWith(jobGroupName)));
  234. return scheduler.deleteJobs(jobKeys);
  235. } catch (SchedulerException e) {
  236. logger.error(String.format("delete all jobs in job group: %s failed",jobGroupName), e);
  237. } finally {
  238. lock.writeLock().unlock();
  239. }
  240. return false;
  241. }
  242. /**
  243. * build job name
  244. */
  245. public static String buildJobName(int processId) {
  246. StringBuilder sb = new StringBuilder(30);
  247. sb.append(Constants.QUARTZ_JOB_PRIFIX).append(Constants.UNDERLINE).append(processId);
  248. return sb.toString();
  249. }
  250. /**
  251. * build job group name
  252. */
  253. public static String buildJobGroupName(int projectId) {
  254. StringBuilder sb = new StringBuilder(30);
  255. sb.append(Constants.QUARTZ_JOB_GROUP_PRIFIX).append(Constants.UNDERLINE).append(projectId);
  256. return sb.toString();
  257. }
  258. /**
  259. * add params to map
  260. *
  261. * @param projectId
  262. * @param scheduleId
  263. * @param schedule
  264. * @return
  265. */
  266. public static Map<String, Object> buildDataMap(int projectId, int scheduleId, Schedule schedule) {
  267. Map<String, Object> dataMap = new HashMap<>(3);
  268. dataMap.put(Constants.PROJECT_ID, projectId);
  269. dataMap.put(Constants.SCHEDULE_ID, scheduleId);
  270. dataMap.put(Constants.SCHEDULE, JSONUtils.toJson(schedule));
  271. return dataMap;
  272. }
  273. }