|
@@ -37,8 +37,27 @@ public class TaskQueueZkImpl implements ITaskQueue {
|
|
|
|
|
|
private static final Logger logger = LoggerFactory.getLogger(TaskQueueZkImpl.class);
|
|
|
|
|
|
+ private final ZookeeperOperator zookeeperOperator;
|
|
|
+
|
|
|
@Autowired
|
|
|
- private ZookeeperOperator zookeeperOperator;
|
|
|
+ public TaskQueueZkImpl(ZookeeperOperator zookeeperOperator) {
|
|
|
+ this.zookeeperOperator = zookeeperOperator;
|
|
|
+
|
|
|
+ try {
|
|
|
+ String tasksQueuePath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
|
|
|
+ String tasksCancelPath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL);
|
|
|
+
|
|
|
+ for(String key : new String[]{tasksQueuePath,tasksCancelPath}){
|
|
|
+ if(!zookeeperOperator.isExisted(key)){
|
|
|
+ zookeeperOperator.persist(key, "");
|
|
|
+ logger.info("create tasks queue parent node success : {}", key);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ logger.error("create tasks queue parent node failure", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
|
|
|
* get all tasks from tasks queue
|