Before explaining the architecture of the schedule system, let us first understand the common nouns of the schedule system.
DAG: Full name Directed Acyclic Graph,referred to as DAG。Tasks in the workflow are assembled in the form of directed acyclic graphs, which are topologically traversed from nodes with zero indegrees of ingress until there are no successor nodes. For example, the following picture:
dag example
Process definition: Visualization DAG by dragging task nodes and establishing associations of task nodes
Process instance: A process instance is an instantiation of a process definition, which can be generated by manual startup or scheduling. The process definition runs once, a new process instance is generated
Task instance: A task instance is the instantiation of a specific task node when a process instance runs, which indicates the specific task execution status
Task type: Currently supports SHELL, SQL, SUB_PROCESS (sub-process), PROCEDURE, MR, SPARK, PYTHON, DEPENDENT (dependency), and plans to support dynamic plug-in extension, note: the sub-SUB_PROCESS is also A separate process definition that can be launched separately
Schedule mode : The system supports timing schedule and manual schedule based on cron expressions. Command type support: start workflow, start execution from current node, resume fault-tolerant workflow, resume pause process, start execution from failed node, complement, timer, rerun, pause, stop, resume waiting thread. Where recovers the fault-tolerant workflow and restores the waiting thread The two command types are used by the scheduling internal control and cannot be called externally
Timed schedule: The system uses quartz distributed scheduler and supports the generation of cron expression visualization
Dependency: The system does not only support DAG Simple dependencies between predecessors and successor nodes, but also provides task dependencies nodes, support for custom task dependencies between processes**
Priority: Supports the priority of process instances and task instances. If the process instance and task instance priority are not set, the default is first in, first out.
Mail Alert: Support SQL Task Query Result Email Send, Process Instance Run Result Email Alert and Fault Tolerant Alert Notification
Failure policy: For tasks running in parallel, if there are tasks that fail, two failure policy processing methods are provided. Continue means that the status of the task is run in parallel until the end of the process failure. End means that once a failed task is found, Kill also drops the running parallel task and the process ends.
Complement: Complement historical data, support ** interval parallel and serial ** two complement methods
System Architecture Diagram
MasterServer
MasterServer adopts the distributed non-central design concept. MasterServer is mainly responsible for DAG task split, task submission monitoring, and monitoring the health status of other MasterServer and WorkerServer. When the MasterServer service starts, it registers a temporary node with Zookeeper, and listens to the Zookeeper temporary node state change for fault tolerance processing.
Distributed Quartz distributed scheduling component, mainly responsible for the start and stop operation of the scheduled task. When the quartz picks up the task, the master internally has a thread pool to be responsible for the subsequent operations of the task.
MasterSchedulerThread is a scan thread that periodically scans the command table in the database for different business operations based on different ** command types**
MasterExecThread is mainly responsible for DAG task segmentation, task submission monitoring, logic processing of various command types
MasterTaskExecThread is mainly responsible for task persistence
WorkerServer
##### This service contains:
LoggerServer is an RPC service that provides functions such as log fragment viewing, refresh and download.
ZooKeeper
The ZooKeeper service, the MasterServer and the WorkerServer nodes in the system all use the ZooKeeper for cluster management and fault tolerance. In addition, the system also performs event monitoring and distributed locking based on ZooKeeper. We have also implemented queues based on Redis, but we hope that EasyScheduler relies on as few components as possible, so we finally removed the Redis implementation.
The task queue operation is provided. Currently, the queue is also implemented based on Zookeeper. Since there is less information stored in the queue, there is no need to worry about too much data in the queue. In fact, we have over-measured a million-level data storage queue, which has no effect on system stability and performance.
Provides alarm-related interfaces. The interfaces mainly include Alarms. The storage, query, and notification functions of the two types of alarm data. The notification function has two types: mail notification and SNMP (not yet implemented).
The API interface layer is mainly responsible for processing requests from the front-end UI layer. The service provides a RESTful api to provide request services externally. Interfaces include workflow creation, definition, query, modification, release, offline, manual start, stop, pause, resume, start execution from this node, and more.
The front-end page of the system provides various visual operation interfaces of the system. For details, see the System User Manual section.
The centralized design concept is relatively simple. The nodes in the distributed cluster are divided into two roles according to their roles:
Problems in the design of centralized :
In fact, truly decentralized distributed systems are rare. Instead, dynamic centralized distributed systems are constantly emerging. Under this architecture, the managers in the cluster are dynamically selected, rather than preset, and when the cluster fails, the nodes of the cluster will spontaneously hold "meetings" to elect new "managers". Go to preside over the work. The most typical case is the Etcd implemented in ZooKeeper and Go.
Decentralization of EasyScheduler is the registration of Master/Worker to ZooKeeper. The Master Cluster and the Worker Cluster are not centered, and the Zookeeper distributed lock is used to elect one Master or Worker as the “manager” to perform the task.
EasyScheduler uses ZooKeeper distributed locks to implement only one Master to execute the Scheduler at the same time, or only one Worker to perform task submission.
In the above figure, MainFlowThread waits for SubFlowThread1 to end, SubFlowThread1 waits for SubFlowThread2 to end, SubFlowThread2 waits for SubFlowThread3 to end, and SubFlowThread3 waits for a new thread in the thread pool, then the entire DAG process cannot end, and thus the thread cannot be released. This forms the state of the child parent process loop waiting. At this point, the scheduling cluster will no longer be available unless a new Master is started to add threads to break such a "stuck."
It seems a bit unsatisfactory to start a new Master to break the deadlock, so we proposed the following three options to reduce this risk:
Note: The Master Scheduler thread is FIFO-enabled when it gets the Command.
So we chose the third way to solve the problem of insufficient threads.
Fault tolerance is divided into service fault tolerance and task retry. Service fault tolerance is divided into two types: Master Fault Tolerance and Worker Fault Tolerance.
Service fault tolerance design relies on ZooKeeper's Watcher mechanism. The implementation principle is as follows:
The Master monitors the directories of other Masters and Workers. If the remove event is detected, the process instance is fault-tolerant or the task instance is fault-tolerant according to the specific business logic.
After the ZooKeeper Master is fault-tolerant, it is rescheduled by the Scheduler thread in EasyScheduler. It traverses the DAG to find the "Running" and "Submit Successful" tasks, and monitors the status of its task instance for the "Running" task. You need to determine whether the Task Queue already exists. If it exists, monitor the status of the task instance. If it does not exist, resubmit the task instance.
Once the Master Scheduler thread finds the task instance as "need to be fault tolerant", it takes over the task and resubmits.
Note: Because the "network jitter" may cause the node to lose the heartbeat of ZooKeeper in a short time, the node's remove event occurs. In this case, we use the easiest way, that is, once the node has timeout connection with ZooKeeper, it will directly stop the Master or Worker service.
Here we must first distinguish between the concept of task failure retry, process failure recovery, and process failure rerun:
Next, let's talk about the topic, we divided the task nodes in the workflow into two types.
Each ** service node** can configure the number of failed retries. When the task node fails, it will automatically retry until it succeeds or exceeds the configured number of retries. Logical node does not support failed retry. But the tasks in the logical nodes support retry.
If there is a task failure in the workflow that reaches the maximum number of retries, the workflow will fail to stop, and the failed workflow can be manually rerun or process resumed.
In the early scheduling design, if there is no priority design and fair scheduling design, it will encounter the situation that the task submitted first may be completed simultaneously with the task submitted subsequently, but the priority of the process or task cannot be set. We have redesigned this, and we are currently designing it as follows:
According to ** different process instance priority ** prioritizes ** same process instance priority ** prioritizes ** task priority within the same process ** takes precedence over ** same process ** commit order from high Go to low for task processing.
The specific implementation is to resolve the priority according to the json of the task instance, and then save the ** process instance priority _ process instance id_task priority _ task id** information in the ZooKeeper task queue, when obtained from the task queue, Through string comparison, you can get the task that needs to be executed first.
The priority of the process definition is that some processes need to be processed before other processes. This can be configured at the start of the process or at the time of scheduled start. There are 5 levels, followed by HIGHEST, HIGH, MEDIUM, LOW, and LOWEST. As shown below
<img src="https://analysys.github.io/easyscheduler_docs_cn/images/process_priority.png" alt="Process Priority Configuration" width="40%" />
<img src="https://analysys.github.io/easyscheduler_docs_cn/images/task_priority.png" alt="task priority configuration" width="35%" />
/**
* task log appender
*/
Public class TaskLogAppender extends FileAppender<ILoggingEvent {
...
@Override
Protected void append(ILoggingEvent event) {
If (currentlyActiveFile == null){
currentlyActiveFile = getFile();
}
String activeFile = currentlyActiveFile;
// thread name: taskThreadName-processDefineId_processInstanceId_taskInstanceId
String threadName = event.getThreadName();
String[] threadNameArr = threadName.split("-");
// logId = processDefineId_processInstanceId_taskInstanceId
String logId = threadNameArr[1];
...
super.subAppend(event);
}
}
Generate a log in the form of /process definition id/process instance id/task instance id.log
/**
* task log filter
*/
Public class TaskLogFilter extends Filter<ILoggingEvent {
@Override
Public FilterReply decide(ILoggingEvent event) {
If (event.getThreadName().startsWith("TaskLogInfo-")){
Return FilterReply.ACCEPT;
}
Return FilterReply.DENY;
}
}
Starting from the scheduling, this paper introduces the architecture principle and implementation ideas of the big data distributed workflow scheduling system-EasyScheduler. To be continued