• <noscript id="e0iig"><kbd id="e0iig"></kbd></noscript>
  • <td id="e0iig"></td>
  • <option id="e0iig"></option>
  • <noscript id="e0iig"><source id="e0iig"></source></noscript>
  • DolphinScheduler1.2源碼開發環境搭建及架構分析

    標簽: 大數據  java

    一、本地開發環境的搭建

    1、準備工作:

    1.1、環境:

    后端

    • JDK(1.8+)
    • Maven(3.3+)
      最好在本地解壓一個hadoop并配上環境變量(不配好像會報一個winutils…的問題)

    前端

    • node(Node包下載 (注意版本 8.9.4) https://nodejs.org/download/release/v8.9.4/)
      本地環境變量的配置:

    1.2、源碼下載

    1. git branch -a #查看分支
    2. git checkout dev-db #切換到dev-db分支
    3. git pull #同步分支
      #由于項目前端的日志模塊,使用了gRPC調用后端,所以需要先編譯項目。
    4. 編譯項目:mvn -U clean package -Prelease -Dmaven.test.skip=true -Dmaven.javadoc.skip=true(1.2之后的版本)
      (1.2之前:mvn -U clean package assembly:assembly -Dmaven.test.skip=true
      *如果dolpinscheduler-ui模塊編譯不過,可以直接去pom.xml文件中去掉ui模塊,跳過編譯

    2、搭建后端:

    2.1、修改 pom.xml 文件 [采用 mysql 數據庫 ]

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>${mysql.connector.version}</version>
        <!--<scope>test</scope>-->
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2.2、修改配置

    1、dolphinscheduler-common的hadoop.properties主要修改的配置項有:

    	fs.defaultFS=hdfs://xxxx
    	yarn.resourcemanager.ha.rm.ids=192.168.0.244,192.168.0.245
    
    • 1
    • 2

    2、dolphinscheduler-common的common.properties主要修改的配置項有:

    	res.upload.startup.type=HDFS
    
    • 1

    3、dolphinscheduler-common的quartz.properties:ds1.2數據庫默認使用postgresql,所以如果使用mysql,需要修改mysql的配置

    	org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
    	org.quartz.dataSource.myDs.driver = com.mysql.jdbc.Driver
    
    • 1
    • 2

    4、dolphinscheduler-common的zookeeper.properties:配置連接及路徑
    5、dolphinscheduler-dao的application-dao.properties修改spring.datasource的相關配置

    2.3、數據庫初始化

    1. 創建ds數據庫CREATE DATABASE dolphinscheduler
    2. 創建表和初始化數據: 修改dao模塊resource目錄下application.properties文件中的數據庫配置信息,然后執行org.apache.dolphinscheduler.dao.upgrade.shell.CreateDolphinScheduler的類,運行完,刷新數據庫,表和數據都有了。

    2.4、運行前的配置修改

    2.4.1、啟動MasterServer

    org.apache.dolphinscheduler.server.master.MasterServer類main函數增加如下代碼:

    System.setProperty("spring.profiles.active","master");
    
    • 1

    修改server模塊resources目錄下master_logback.xml文件,增加以下代碼:

    <root level="INFO">
    	 <appender-ref ref="MASTERLOGFILE"/>
    	 <!-- 增加日志到控制臺-->
    	 <appender-ref ref="STDOUT"/>
    </root>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    然后執行MasterServer即可。

    2.4.2、啟動WorkerServer

    org.apache.dolphinscheduler.server.worker.WorkerServer類main函數增加如下代碼:

    System.setProperty("spring.profiles.active","worker");
    
    • 1

    修改server模塊resources目錄下worker_logback.xml文件,增加以下代碼:

    <root level="INFO">
    	 <appender-ref ref="TASKLOGFILE"/>
    	 <appender-ref ref="WORKERLOGFILE"/>
    	 <!-- 增加日志到控制臺-->
    	 <appender-ref ref="STDOUT"/>
    </root>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    然后執行WorkerServer即可。

    2.4.3、啟動CombinedApplicationServer的mian函數下增加

    System.setProperty("spring.profiles.active","combined");
    
    • 1

    2.4.4、錯誤收集:

    1、如果報找不到/etc/passwd文件的錯誤,則定位到指定位置OSUtils.java,從服務器上下載一個passwd文件到本地,修改路徑
    2、如果無法獲取fs文件系統,是因為之前common模塊下的hadoop.properties中是配置的
    fs.defaultFS=hdfs://xxxx 命名空間,會顯示無法解析xxxx,所以將core-site.xml和hdfs-site.xml拷貝到common的resource模塊下。

    3、搭建前端

    3.1、編譯

    cd apache-dolphinscheduler-1.2.1-src/dolphinscheduler-ui
    npm install node-sass --unsafe-perm(不一定每次都會安裝成功,看人品吧)
    npm install [ 慢的話用這個 npm install --registry https://registry.npm.taobao.org]

    3.2、修改配置

    /workspace/apache-dolphinscheduler-1.2.1-src/dolphinscheduler-ui/.env下的
    API_BASE = http://127.0.0.1:12345
    DEV_HOST = 127.0.0.1

    3.3啟動

    npm run dev

    4、觀看效果:

    地址:127.0.0.1:8888
    用戶名\密碼:admin\dolphinscheduler123
    在這里插入圖片描述

    二、源碼結構分析

    1、DolphinScheduler的架構圖

    在這里插入圖片描述

    架構說明:

    1.1、Quartz

    SchedulerController
    createSchedule:將調度的相關信息插入t_ds_schedules表中
    online:此時才會將狀態為online的流程加入調度
    	    schedulerService.setScheduleState(loginUser, projectName, id, ReleaseState.ONLINE)
             case ONLINE:setSchedule(project.getId(), id);
     	                                QuartzExecutors.getInstance().addJob(ProcessScheduleJob.class, jobName, jobGroupName, startDate, endDate,schedule.getCrontab(), dataMap)
    *org.apache.dolphinscheduler.server.quartz.QuartzExecutors,是內部對Quartz進行的一個封裝,僅僅提供增加、刪除作業的基礎功能。其作業的狀態等信息保存在數據庫中以QRTZ_開頭的表。
    為了將實際作業的定義與Quartz隔離,抽象了一個ProcessScheduleJob類,用它來創建JobDetail。該類僅僅是根據流程定義的定時等信息創建了一個CommandType.SCHEDULER類型的Command對象,然后插入了數據庫,并沒有的執行任務的具體邏輯。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    1.2、MasterSchedulerThread

    對應架構圖中的CommandScanner
    這是一個掃描線程,定時掃描數據庫中的 t_ds_command 表,根據不同的命令類型進行不同的業務操作。掃描的SQL如下:
    
    • 1
    • 2
    select command.* from t_ds_command command
        join t_ds_process_definition definition on command.process_definition_id = definition.id
        where definition.release_state = 1 AND definition.flag = 1
        order by command.update_time asc
        limit 1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    定時的默認是1秒,由Constants.SLEEP_TIME_MILLIS設置。Command的創建與執行是異步的。
    MasterSchedulerThread類查詢到一個Comamand后將其轉化為一個ProcessInstance,交由MasterExecThread進行執行。
    MasterSchedulerThread功能比較簡單,就是負責銜接Quartz創建的Command,一個橋梁的作用:
    即如果狀態設置為online后,會將scheduler表中的記錄添加到quartz中,quartz根據scheduler中設置的時間觸發,觸發后會向t_ds_command中添加一條記錄,每一秒MasterSchedulerThread會掃描t_ds_command中的記錄封裝成ProcessInstance,交由MasterExecThread進行執行
    
    • 1
    • 2
    • 3
    • 4

    1.3、MasterExecThread

    負責執行ProcessInstance,功能主要是DAG任務切分、任務提交監控等其他邏輯處理。
    DAG切割:首先找入度為0的任務(也就是沒有任務依賴),放到準備提交隊列;任務執行成功后,掃描后續的任務,如果該任務的所有依賴都成功,則執行該任務;循環處理。
    MasterExecThread隨著DAG中所有任務的執行結束而結束。在MasterExecThread中,也沒有執行具體的任務邏輯,只是創建了一個MasterTaskExecThread負責任務的“執行”。
    
    • 1
    • 2
    • 3

    關鍵代碼如下:

    // execute flow
    private void executeProcess() throws Exception {
            prepareProcess();//init task queue and generate process dag
            runProcess();//submit and watch the tasks, until the work flow stop,將dag中的task添加到不同的taskLis中
                         //先檢查資源是否充足,submitStandByTask。submitTaskExec(task);將任務給MasterTaskExecThread執行
            endProcess();//updateProcessInstance and createRecoveryWaitingThreadCommand(對于子流程)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    1.4、MasterTaskExecThread

    其功能主要就是負責任務的持久化,簡單來說就是把TaskInstacne信息保存到數據庫中,同時如果一個任務滿足執行條件,也會把任務ID提交到TaskQueue中的。
    這個線程會每隔1秒(Constants.SLEEP_TIME_MILLIS設置)查詢作業的狀態,直到作業執行完畢(不管是成功還是失敗

    public TaskInstance submitTask(TaskInstance taskInstance, ProcessInstance processInstance){
            logger.info("start submit task : {}, instance id:{}, state: {}, ",
                    taskInstance.getName(), processInstance.getId(), processInstance.getState() );
            processInstance = this.findProcessInstanceDetailById(processInstance.getId());
            //submit to mysql
            TaskInstance task= submitTaskInstanceToMysql(taskInstance, processInstance);
            if(task.isSubProcess() && !task.getState().typeIsFinished()){
                ProcessInstanceMap processInstanceMap = setProcessInstanceMap(processInstance, task);
                TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class);
                Map<String, String> subProcessParam = JSONUtils.toMap(taskNode.getParams());
                Integer defineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID));
                createSubWorkProcessCommand(processInstance, processInstanceMap, defineId, task);
            }else if(!task.getState().typeIsFinished()){
                //submit to task queue
                task.setProcessInstancePriority(processInstance.getProcessInstancePriority());
                submitTaskToQueue(task);
            }
            logger.info("submit task :{} state:{} complete, instance id:{} state: {}  ",
                    taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState());
            return task;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    1.5、TaskQueue

    架構圖中Master/Worker通信的重要渠道,它把待執行的隊列放到了TaskQueue,由Worker獲取到之后,執行具體的業務邏輯。根據技術架構介紹,這個TaskQueue是由Zookeeper實現。由此也可以看出,Master、Worker是沒有直接的物理交互的。
    關鍵代碼如下:

    protected ITaskQueue taskQueue = TaskQueueFactory.getTaskQueueInstance();
    public static ITaskQueue getTaskQueueInstance() {
        String queueImplValue = CommonUtils.getQueueImplValue();
        if (StringUtils.isNotBlank(queueImplValue)) {
            logger.info("task queue impl use zookeeper ");
            return TaskQueueZkImpl.getInstance();
        }else{
          logger.error("property dolphinscheduler.queue.impl can't be blank, system will exit ");
          System.exit(-1);
        }
        return null;
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    1.6、FetchTaskThread

    org.apache.dolphinscheduler.server.worker.runner.FetchTaskThread循環從TaskQueue中獲取任務,并根據不同任務類型調用TaskScheduleThread對應執行器。每次循環依舊休眠1秒。
    List<String> tasksQueueList = taskQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
    // submit task
    workerExecService.submit(new TaskScheduleThread(taskInstance, processDao));
    
    • 1
    • 2
    • 3
    • 4

    1.7、TaskScheduleThread

    org.apache.dolphinscheduler.server.worker.runner.TaskScheduleThread負責任務的具體執行。構造獲取任務相關的文件、參數等信息,創建Process類,執行對應的命令行,然后等待其執行完畢,獲取標準輸出、標準錯誤輸出、返回碼等信息
    // update task state is running according to task type
    updateTaskState(taskInstance.getTaskType());
    //根據不同的task類型,產生不同的task實例類
    task = TaskManager.newTask(taskInstance.getTaskType(),taskProps,taskLogger);
    // task init
    task.init();
    // task handle
    task.handle();
    // task result process
    task.after();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    1.8、LoggerServer

    org.apache.dolphinscheduler.server.rpc.LoggerServer跟Worker、Master屬于同一級別,都是需要單獨啟動的進程。這就是一個RPC服務器,提供日志分片查看、刷新和下載等功能。

    2、項目架構

    2.1、模塊:

    1. dolphinscheduler-ui 前端頁面模塊
    2. dolphinscheduler-server 核心模塊。包括master/worker等功能
    3. dolphinscheduler-common 公共模塊。公共方法或類
    4. dolphinscheduler-api Restful接口。前后端交互層,與master/worker交互等功能
    5. dolphinscheduler-dao 數據操作層。實體定義、數據存儲
    6. dolphinscheduler-alert 預警模塊。與預警相關的方法、功能
    7. dolphinscheduler-rpc 日志查看。提供日志實時查看rpc功能
    8. dolphinscheduler-dist 與編譯、分發相關的模塊。沒有具體邏輯功能
      2.2、源碼分析
      從與UI交互的API模塊開始著手,重點分析核心功能。(具體API文檔見官網)

    詳見:https://www.cnblogs.com/gabry/p/12162272.html

    版權聲明:本文為zhaxiaodong原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接和本聲明。
    本文鏈接:https://blog.csdn.net/zhaxiaodong/article/details/105288206

    智能推薦

    Activiti架構分析及源碼詳解

    工作流引擎,應用于解決流程審批和流程編排方面等問題,有效的提供了擴展性的支撐。而目前來說,工作流領域也有了相對通行化的標準規范,也就是 BPMN2.0。支持這個規范的開源引擎主要有:Activiti,flowable,Jbpm4 等。本文著重對 Activiti 的架構設計進行分析和梳理,同時對流程啟動和原子操作的相關代碼進行完整走讀。 本文的閱讀對象需要對 Activiti 有一定的理解并且已經...

    HTML中常用操作關于:頁面跳轉,空格

    1.頁面跳轉 2.空格的代替符...

    freemarker + ItextRender 根據模板生成PDF文件

    1. 制作模板 2. 獲取模板,并將所獲取的數據加載生成html文件 2. 生成PDF文件 其中由兩個地方需要注意,都是關于獲取文件路徑的問題,由于項目部署的時候是打包成jar包形式,所以在開發過程中時直接安照傳統的獲取方法沒有一點文件,但是當打包后部署,總是出錯。于是參考網上文章,先將文件讀出來到項目的臨時目錄下,然后再按正常方式加載該臨時文件; 還有一個問題至今沒有解決,就是關于生成PDF文件...

    電腦空間不夠了?教你一個小秒招快速清理 Docker 占用的磁盤空間!

    Docker 很占用空間,每當我們運行容器、拉取鏡像、部署應用、構建自己的鏡像時,我們的磁盤空間會被大量占用。 如果你也被這個問題所困擾,咱們就一起看一下 Docker 是如何使用磁盤空間的,以及如何回收。 docker 占用的空間可以通過下面的命令查看: TYPE 列出了docker 使用磁盤的 4 種類型: Images:所有鏡像占用的空間,包括拉取下來的鏡像,和本地構建的。 Con...

    requests實現全自動PPT模板

    http://www.1ppt.com/moban/ 可以免費的下載PPT模板,當然如果要人工一個個下,還是挺麻煩的,我們可以利用requests輕松下載 訪問這個主頁,我們可以看到下面的樣式 點每一個PPT模板的圖片,我們可以進入到詳細的信息頁面,翻到下面,我們可以看到對應的下載地址 點擊這個下載的按鈕,我們便可以下載對應的PPT壓縮包 那我們就開始做吧 首先,查看網頁的源代碼,我們可以看到每一...

    猜你喜歡

    Linux C系統編程-線程互斥鎖(四)

    互斥鎖 互斥鎖也是屬于線程之間處理同步互斥方式,有上鎖/解鎖兩種狀態。 互斥鎖函數接口 1)初始化互斥鎖 pthread_mutex_init() man 3 pthread_mutex_init (找不到的情況下首先 sudo apt-get install glibc-doc sudo apt-get install manpages-posix-dev) 動態初始化 int pthread_...

    統計學習方法 - 樸素貝葉斯

    引入問題:一機器在良好狀態生產合格產品幾率是 90%,在故障狀態生產合格產品幾率是 30%,機器良好的概率是 75%。若一日第一件產品是合格品,那么此日機器良好的概率是多少。 貝葉斯模型 生成模型與判別模型 判別模型,即要判斷這個東西到底是哪一類,也就是要求y,那就用給定的x去預測。 生成模型,是要生成一個模型,那就是誰根據什么生成了模型,誰就是類別y,根據的內容就是x 以上述例子,判斷一個生產出...

    styled-components —— React 中的 CSS 最佳實踐

    https://zhuanlan.zhihu.com/p/29344146 Styled-components 是目前 React 樣式方案中最受關注的一種,它既具備了 css-in-js 的模塊化與參數化優點,又完全使用CSS的書寫習慣,不會引起額外的學習成本。本文是 styled-components 作者之一 Max Stoiber 所寫,首先總結了前端組件化樣式中的最佳實踐原則,然后在此基...

    基于TCP/IP的網絡聊天室用Java來實現

    基于TCP/IP的網絡聊天室實現 開發工具:eclipse 開發環境:jdk1.8 發送端 接收端 工具類 運行截圖...

    精品国产乱码久久久久久蜜桃不卡