DolphinScheduler1.2源碼開發環境搭建及架構分析
一、本地開發環境的搭建
1、準備工作:
1.1、環境:
后端
- JDK(1.8+)
- Maven(3.3+)
最好在本地解壓一個hadoop并配上環境變量(不配好像會報一個winutils…的問題)
前端
- node(Node包下載 (注意版本 8.9.4) https://nodejs.org/download/release/v8.9.4/)
本地環境變量的配置:
1.2、源碼下載
git branch -a
#查看分支git checkout dev-db
#切換到dev-db分支git pull
#同步分支
#由于項目前端的日志模塊,使用了gRPC調用后端,所以需要先編譯項目。- 編譯項目:
mvn -U clean package -Prelease -Dmaven.test.skip=true -Dmaven.javadoc.skip=true
(1.2之后的版本)
(1.2之前:mvn -U clean package assembly:assembly -Dmaven.test.skip=true
)
*如果dolpinscheduler-ui模塊編譯不過,可以直接去pom.xml文件中去掉ui模塊,跳過編譯
2、搭建后端:
2.1、修改 pom.xml 文件 [采用 mysql 數據庫 ]
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.connector.version}</version>
<!--<scope>test</scope>-->
</dependency>
- 1
- 2
- 3
- 4
- 5
- 6
2.2、修改配置
1、dolphinscheduler-common的hadoop.properties
主要修改的配置項有:
fs.defaultFS=hdfs://xxxx
yarn.resourcemanager.ha.rm.ids=192.168.0.244,192.168.0.245
- 1
- 2
2、dolphinscheduler-common的common.properties
主要修改的配置項有:
res.upload.startup.type=HDFS
- 1
3、dolphinscheduler-common的quartz.properties
:ds1.2數據庫默認使用postgresql,所以如果使用mysql,需要修改mysql的配置
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.dataSource.myDs.driver = com.mysql.jdbc.Driver
- 1
- 2
4、dolphinscheduler-common的zookeeper.properties
:配置連接及路徑
5、dolphinscheduler-dao的application-dao.properties
修改spring.datasource的相關配置
2.3、數據庫初始化
- 創建ds數據庫
CREATE DATABASE dolphinscheduler
- 創建表和初始化數據: 修改dao模塊resource目錄下
application.properties
文件中的數據庫配置信息,然后執行org.apache.dolphinscheduler.dao.upgrade.shell.CreateDolphinScheduler
的類,運行完,刷新數據庫,表和數據都有了。
2.4、運行前的配置修改
2.4.1、啟動MasterServer
org.apache.dolphinscheduler.server.master.MasterServer類main函數增加如下代碼:
System.setProperty("spring.profiles.active","master");
- 1
修改server模塊resources目錄下master_logback.xml文件,增加以下代碼:
<root level="INFO">
<appender-ref ref="MASTERLOGFILE"/>
<!-- 增加日志到控制臺-->
<appender-ref ref="STDOUT"/>
</root>
- 1
- 2
- 3
- 4
- 5
然后執行MasterServer即可。
2.4.2、啟動WorkerServer
org.apache.dolphinscheduler.server.worker.WorkerServer類main函數增加如下代碼:
System.setProperty("spring.profiles.active","worker");
- 1
修改server模塊resources目錄下worker_logback.xml文件,增加以下代碼:
<root level="INFO">
<appender-ref ref="TASKLOGFILE"/>
<appender-ref ref="WORKERLOGFILE"/>
<!-- 增加日志到控制臺-->
<appender-ref ref="STDOUT"/>
</root>
- 1
- 2
- 3
- 4
- 5
- 6
然后執行WorkerServer即可。
2.4.3、啟動CombinedApplicationServer的mian函數下增加
System.setProperty("spring.profiles.active","combined");
- 1
2.4.4、錯誤收集:
1、如果報找不到/etc/passwd文件的錯誤,則定位到指定位置OSUtils.java,從服務器上下載一個passwd文件到本地,修改路徑
2、如果無法獲取fs文件系統,是因為之前common模塊下的hadoop.properties中是配置的
fs.defaultFS=hdfs://xxxx 命名空間,會顯示無法解析xxxx,所以將core-site.xml和hdfs-site.xml拷貝到common的resource模塊下。
3、搭建前端
3.1、編譯
cd apache-dolphinscheduler-1.2.1-src/dolphinscheduler-ui
npm install node-sass --unsafe-perm
(不一定每次都會安裝成功,看人品吧)
npm install
[ 慢的話用這個 npm install --registry https://registry.npm.taobao.org
]
3.2、修改配置
/workspace/apache-dolphinscheduler-1.2.1-src/dolphinscheduler-ui/.env
下的
API_BASE = http://127.0.0.1:12345
DEV_HOST = 127.0.0.1
3.3啟動
npm run dev
4、觀看效果:
地址:127.0.0.1:8888
用戶名\密碼:admin\dolphinscheduler123
二、源碼結構分析
1、DolphinScheduler的架構圖
架構說明:
1.1、Quartz
SchedulerController
createSchedule:將調度的相關信息插入t_ds_schedules表中
online:此時才會將狀態為online的流程加入調度
schedulerService.setScheduleState(loginUser, projectName, id, ReleaseState.ONLINE)
case ONLINE:setSchedule(project.getId(), id);
QuartzExecutors.getInstance().addJob(ProcessScheduleJob.class, jobName, jobGroupName, startDate, endDate,schedule.getCrontab(), dataMap)
*org.apache.dolphinscheduler.server.quartz.QuartzExecutors,是內部對Quartz進行的一個封裝,僅僅提供增加、刪除作業的基礎功能。其作業的狀態等信息保存在數據庫中以QRTZ_開頭的表。
為了將實際作業的定義與Quartz隔離,抽象了一個ProcessScheduleJob類,用它來創建JobDetail。該類僅僅是根據流程定義的定時等信息創建了一個CommandType.SCHEDULER類型的Command對象,然后插入了數據庫,并沒有的執行任務的具體邏輯。
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
1.2、MasterSchedulerThread
對應架構圖中的CommandScanner
這是一個掃描線程,定時掃描數據庫中的 t_ds_command 表,根據不同的命令類型進行不同的業務操作。掃描的SQL如下:
- 1
- 2
select command.* from t_ds_command command
join t_ds_process_definition definition on command.process_definition_id = definition.id
where definition.release_state = 1 AND definition.flag = 1
order by command.update_time asc
limit 1
- 1
- 2
- 3
- 4
- 5
定時的默認是1秒,由Constants.SLEEP_TIME_MILLIS設置。Command的創建與執行是異步的。
MasterSchedulerThread類查詢到一個Comamand后將其轉化為一個ProcessInstance,交由MasterExecThread進行執行。
MasterSchedulerThread功能比較簡單,就是負責銜接Quartz創建的Command,一個橋梁的作用:
即如果狀態設置為online后,會將scheduler表中的記錄添加到quartz中,quartz根據scheduler中設置的時間觸發,觸發后會向t_ds_command中添加一條記錄,每一秒MasterSchedulerThread會掃描t_ds_command中的記錄封裝成ProcessInstance,交由MasterExecThread進行執行
- 1
- 2
- 3
- 4
1.3、MasterExecThread
負責執行ProcessInstance,功能主要是DAG任務切分、任務提交監控等其他邏輯處理。
DAG切割:首先找入度為0的任務(也就是沒有任務依賴),放到準備提交隊列;任務執行成功后,掃描后續的任務,如果該任務的所有依賴都成功,則執行該任務;循環處理。
MasterExecThread隨著DAG中所有任務的執行結束而結束。在MasterExecThread中,也沒有執行具體的任務邏輯,只是創建了一個MasterTaskExecThread負責任務的“執行”。
- 1
- 2
- 3
關鍵代碼如下:
// execute flow
private void executeProcess() throws Exception {
prepareProcess();//init task queue and generate process dag
runProcess();//submit and watch the tasks, until the work flow stop,將dag中的task添加到不同的taskLis中
//先檢查資源是否充足,submitStandByTask。submitTaskExec(task);將任務給MasterTaskExecThread執行
endProcess();//updateProcessInstance and createRecoveryWaitingThreadCommand(對于子流程)
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
1.4、MasterTaskExecThread
其功能主要就是負責任務的持久化,簡單來說就是把TaskInstacne信息保存到數據庫中,同時如果一個任務滿足執行條件,也會把任務ID提交到TaskQueue中的。
這個線程會每隔1秒(Constants.SLEEP_TIME_MILLIS設置)查詢作業的狀態,直到作業執行完畢(不管是成功還是失敗
public TaskInstance submitTask(TaskInstance taskInstance, ProcessInstance processInstance){
logger.info("start submit task : {}, instance id:{}, state: {}, ",
taskInstance.getName(), processInstance.getId(), processInstance.getState() );
processInstance = this.findProcessInstanceDetailById(processInstance.getId());
//submit to mysql
TaskInstance task= submitTaskInstanceToMysql(taskInstance, processInstance);
if(task.isSubProcess() && !task.getState().typeIsFinished()){
ProcessInstanceMap processInstanceMap = setProcessInstanceMap(processInstance, task);
TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class);
Map<String, String> subProcessParam = JSONUtils.toMap(taskNode.getParams());
Integer defineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID));
createSubWorkProcessCommand(processInstance, processInstanceMap, defineId, task);
}else if(!task.getState().typeIsFinished()){
//submit to task queue
task.setProcessInstancePriority(processInstance.getProcessInstancePriority());
submitTaskToQueue(task);
}
logger.info("submit task :{} state:{} complete, instance id:{} state: {} ",
taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState());
return task;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
1.5、TaskQueue
架構圖中Master/Worker通信的重要渠道,它把待執行的隊列放到了TaskQueue,由Worker獲取到之后,執行具體的業務邏輯。根據技術架構介紹,這個TaskQueue是由Zookeeper實現。由此也可以看出,Master、Worker是沒有直接的物理交互的。
關鍵代碼如下:
protected ITaskQueue taskQueue = TaskQueueFactory.getTaskQueueInstance();
public static ITaskQueue getTaskQueueInstance() {
String queueImplValue = CommonUtils.getQueueImplValue();
if (StringUtils.isNotBlank(queueImplValue)) {
logger.info("task queue impl use zookeeper ");
return TaskQueueZkImpl.getInstance();
}else{
logger.error("property dolphinscheduler.queue.impl can't be blank, system will exit ");
System.exit(-1);
}
return null;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
1.6、FetchTaskThread
org.apache.dolphinscheduler.server.worker.runner.FetchTaskThread循環從TaskQueue中獲取任務,并根據不同任務類型調用TaskScheduleThread對應執行器。每次循環依舊休眠1秒。
List<String> tasksQueueList = taskQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
// submit task
workerExecService.submit(new TaskScheduleThread(taskInstance, processDao));
- 1
- 2
- 3
- 4
1.7、TaskScheduleThread
org.apache.dolphinscheduler.server.worker.runner.TaskScheduleThread負責任務的具體執行。構造獲取任務相關的文件、參數等信息,創建Process類,執行對應的命令行,然后等待其執行完畢,獲取標準輸出、標準錯誤輸出、返回碼等信息
// update task state is running according to task type
updateTaskState(taskInstance.getTaskType());
//根據不同的task類型,產生不同的task實例類
task = TaskManager.newTask(taskInstance.getTaskType(),taskProps,taskLogger);
// task init
task.init();
// task handle
task.handle();
// task result process
task.after();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
1.8、LoggerServer
org.apache.dolphinscheduler.server.rpc.LoggerServer跟Worker、Master屬于同一級別,都是需要單獨啟動的進程。這就是一個RPC服務器,提供日志分片查看、刷新和下載等功能。
2、項目架構
2.1、模塊:
- dolphinscheduler-ui 前端頁面模塊
- dolphinscheduler-server 核心模塊。包括master/worker等功能
- dolphinscheduler-common 公共模塊。公共方法或類
- dolphinscheduler-api Restful接口。前后端交互層,與master/worker交互等功能
- dolphinscheduler-dao 數據操作層。實體定義、數據存儲
- dolphinscheduler-alert 預警模塊。與預警相關的方法、功能
- dolphinscheduler-rpc 日志查看。提供日志實時查看rpc功能
- dolphinscheduler-dist 與編譯、分發相關的模塊。沒有具體邏輯功能
2.2、源碼分析
從與UI交互的API模塊開始著手,重點分析核心功能。(具體API文檔見官網)
詳見:https://www.cnblogs.com/gabry/p/12162272.html
智能推薦
Activiti架構分析及源碼詳解
工作流引擎,應用于解決流程審批和流程編排方面等問題,有效的提供了擴展性的支撐。而目前來說,工作流領域也有了相對通行化的標準規范,也就是 BPMN2.0。支持這個規范的開源引擎主要有:Activiti,flowable,Jbpm4 等。本文著重對 Activiti 的架構設計進行分析和梳理,同時對流程啟動和原子操作的相關代碼進行完整走讀。 本文的閱讀對象需要對 Activiti 有一定的理解并且已經...
freemarker + ItextRender 根據模板生成PDF文件
1. 制作模板 2. 獲取模板,并將所獲取的數據加載生成html文件 2. 生成PDF文件 其中由兩個地方需要注意,都是關于獲取文件路徑的問題,由于項目部署的時候是打包成jar包形式,所以在開發過程中時直接安照傳統的獲取方法沒有一點文件,但是當打包后部署,總是出錯。于是參考網上文章,先將文件讀出來到項目的臨時目錄下,然后再按正常方式加載該臨時文件; 還有一個問題至今沒有解決,就是關于生成PDF文件...
電腦空間不夠了?教你一個小秒招快速清理 Docker 占用的磁盤空間!
Docker 很占用空間,每當我們運行容器、拉取鏡像、部署應用、構建自己的鏡像時,我們的磁盤空間會被大量占用。 如果你也被這個問題所困擾,咱們就一起看一下 Docker 是如何使用磁盤空間的,以及如何回收。 docker 占用的空間可以通過下面的命令查看: TYPE 列出了docker 使用磁盤的 4 種類型: Images:所有鏡像占用的空間,包括拉取下來的鏡像,和本地構建的。 Con...
requests實現全自動PPT模板
http://www.1ppt.com/moban/ 可以免費的下載PPT模板,當然如果要人工一個個下,還是挺麻煩的,我們可以利用requests輕松下載 訪問這個主頁,我們可以看到下面的樣式 點每一個PPT模板的圖片,我們可以進入到詳細的信息頁面,翻到下面,我們可以看到對應的下載地址 點擊這個下載的按鈕,我們便可以下載對應的PPT壓縮包 那我們就開始做吧 首先,查看網頁的源代碼,我們可以看到每一...
猜你喜歡
Linux C系統編程-線程互斥鎖(四)
互斥鎖 互斥鎖也是屬于線程之間處理同步互斥方式,有上鎖/解鎖兩種狀態。 互斥鎖函數接口 1)初始化互斥鎖 pthread_mutex_init() man 3 pthread_mutex_init (找不到的情況下首先 sudo apt-get install glibc-doc sudo apt-get install manpages-posix-dev) 動態初始化 int pthread_...
統計學習方法 - 樸素貝葉斯
引入問題:一機器在良好狀態生產合格產品幾率是 90%,在故障狀態生產合格產品幾率是 30%,機器良好的概率是 75%。若一日第一件產品是合格品,那么此日機器良好的概率是多少。 貝葉斯模型 生成模型與判別模型 判別模型,即要判斷這個東西到底是哪一類,也就是要求y,那就用給定的x去預測。 生成模型,是要生成一個模型,那就是誰根據什么生成了模型,誰就是類別y,根據的內容就是x 以上述例子,判斷一個生產出...
styled-components —— React 中的 CSS 最佳實踐
https://zhuanlan.zhihu.com/p/29344146 Styled-components 是目前 React 樣式方案中最受關注的一種,它既具備了 css-in-js 的模塊化與參數化優點,又完全使用CSS的書寫習慣,不會引起額外的學習成本。本文是 styled-components 作者之一 Max Stoiber 所寫,首先總結了前端組件化樣式中的最佳實踐原則,然后在此基...