System architecture diagram
Start process activity diagram
MasterServer adopts a distributed and decentralized design concept. MasterServer is mainly responsible for DAG task segmentation, task submission monitoring, and monitoring the health status of other MasterServer and WorkerServer at the same time. When the MasterServer service starts, register a temporary node with ZooKeeper, and perform fault tolerance by monitoring changes in the temporary node of ZooKeeper. MasterServer provides monitoring services based on netty.
#### The Service Mainly Includes:
DistributedQuartz distributed scheduling component, which is mainly responsible for the start and stop operations of scheduled tasks. When quartz start the task, there will be a thread pool inside the Master responsible for the follow-up operation of the processing task;
MasterSchedulerService is a scanning thread that regularly scans the t_ds_command
table in the database, runs different business operations according to different command types;
WorkflowExecuteRunnable is mainly responsible for DAG task segmentation, task submission monitoring, and logical processing of different event types;
TaskExecuteRunnable is mainly responsible for the processing and persistence of tasks, and generates task events and submits them to the event queue of the process instance;
EventExecuteService is mainly responsible for the polling of the event queue of the process instances;
StateWheelExecuteThread is mainly responsible for process instance and task timeout, task retry, task-dependent polling, and generates the corresponding process instance or task event and submits it to the event queue of the process instance;
FailoverExecuteThread is mainly responsible for the logic of Master fault tolerance and Worker fault tolerance;
WorkerServer
WorkerServer also adopts a distributed and decentralized design concept. WorkerServer is mainly responsible for task execution and providing log services.
When the WorkerServer service starts, register a temporary node with ZooKeeper and maintain a heartbeat. WorkerServer provides monitoring services based on netty.
#### The Service Mainly Includes:
WorkerManagerThread is mainly responsible for the submission of the task queue, continuously receives tasks from the task queue, and submits them to the thread pool for processing;
TaskExecuteThread is mainly responsible for the process of task execution, and the actual processing of tasks according to different task types;
RetryReportTaskStatusThread is mainly responsible for regularly polling to report the task status to the Master until the Master replies to the status ack to avoid the loss of the task status;
ZooKeeper
ZooKeeper service, MasterServer and WorkerServer nodes in the system all use ZooKeeper for cluster management and fault tolerance. In addition, the system implements event monitoring and distributed locks based on ZooKeeper.
We have also implemented queues based on Redis, but we hope DolphinScheduler depends on as few components as possible, so we finally removed the Redis implementation.
Provides alarm services, and implements rich alarm methods through alarm plugins.
The API interface layer is mainly responsible for processing requests from the front-end UI layer. The service uniformly provides RESTful APIs to provide request services to external.
The front-end page of the system provides various visual operation interfaces of the system, see more at Introduction to Functions section.
The centralized design concept is relatively simple. The nodes in the distributed cluster are roughly divided into two roles according to responsibilities:
Problems in centralized thought design:
Fault tolerance divides into service downtime fault tolerance and task retry, and service downtime fault tolerance divides into master fault tolerance and worker fault tolerance.
The service fault-tolerance design relies on ZooKeeper's Watcher mechanism, and the implementation principle shows in the figure:
Fault tolerance range: From the perspective of host, the fault tolerance range of Master includes: own host and node host that does not exist in the registry, and the entire process of fault tolerance will be locked;
Fault-tolerant content: Master's fault-tolerant content includes: fault-tolerant process instances and task instances. Before fault-tolerant, compares the start time of the instance with the server start-up time, and skips fault-tolerance if after the server start time;
Fault-tolerant post-processing: After the fault tolerance of ZooKeeper Master completed, then re-schedule by the Scheduler thread in DolphinScheduler, traverses the DAG to find the "running" and "submit successful" tasks. Monitor the status of its task instances for the "running" tasks, and for the "commits successful" tasks, it is necessary to find out whether the task queue already exists. If exists, monitor the status of the task instance. Otherwise, resubmit the task instance.
Fault tolerance range: From the perspective of process instance, each Master is only responsible for fault tolerance of its own process instance; it will lock only when handleDeadServer
;
Fault-tolerant content: When sending the remove event of the Worker node, the Master only fault-tolerant task instances. Before fault-tolerant, compares the start time of the instance with the server start-up time, and skips fault-tolerance if after the server start time;
Fault-tolerant post-processing: Once the Master Scheduler thread finds that the task instance is in the "fault-tolerant" state, it takes over the task and resubmits it.
Note: Due to "network jitter", the node may lose heartbeat with ZooKeeper in a short period of time, and the node's remove event may occur. For this situation, we use the simplest way, that is, once the node and ZooKeeper timeout connection occurs, then directly stop the Master or Worker service.
Here we must first distinguish the concepts of task failure retry, process failure recovery, and process failure re-run:
Next to the main point, we divide the task nodes in the workflow into two types.
One is a business task, which corresponds to an actual script or process command, such as Shell task, SQL task, and Spark task.
Another is a logical task, which does not operate actual script or process command, but only logical processing to the entire process flow, such as sub-process task, dependent task.
Business node can configure the number of failed retries. When the task node fails, it will automatically retry until it succeeds or exceeds the retry times. Logical node failure retry is not supported.
If there is a task failure in the workflow that reaches the maximum retry times, the workflow will fail and stop, and the failed workflow can be manually re-run or process recovery operations.
In the early schedule design, if there is no priority design and use the fair scheduling, the task submitted first may complete at the same time with the task submitted later, thus invalid the priority of process or task. So we have re-designed this, and the following is our current design:
According to the priority of different process instances prior over priority of the same process instance prior over priority of tasks within the same process prior over tasks within the same process, process task submission order from highest to Lowest.
<img src="https://user-images.githubusercontent.com/10797147/146744784-eb351b14-c94a-4ed6-8ba4-5132c2a3d116.png" alt="Process priority configuration" width="40%" />
The priority of the task is also divides into 5 levels, ordered by HIGHEST, HIGH, MEDIUM, LOW, LOWEST. As shown below:
<conversionRule conversionWord="message" converterClass="org.apache.dolphinscheduler.common.log.SensitiveDataConverter"/>
<appender name="TASKLOGFILE" class="ch.qos.logback.classic.sift.SiftingAppender">
<filter class="org.apache.dolphinscheduler.service.log.TaskLogFilter"/>
<Discriminator class="org.apache.dolphinscheduler.service.log.TaskLogDiscriminator">
<key>taskAppId</key>
<logBase>${log.base}</logBase>
</Discriminator>
<sift>
<appender name="FILE-${taskAppId}" class="ch.qos.logback.core.FileAppender">
<file>${log.base}/${taskAppId}.log</file>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} [%thread] %logger{96}:[%line] - %message%n
</pattern>
<charset>UTF-8</charset>
</encoder>
<append>true</append>
</appender>
</sift>
</appender>
From the perspective of scheduling, this article preliminarily introduces the architecture principles and implementation ideas of the big data distributed workflow scheduling system: DolphinScheduler. To be continued.