从数据库中取出job 队列 创建新的进程进行执行 等待job 结束
代码如下:
- <?php defined('SYSPATH') OR die('No direct access allowed.');
- class Controller_Jobs extends Controller_Base{
- public function before(){
- parent::before();
- if(Request::$protocol != "cli"){
- die("Only cli allowed!\n");
- }
- }
- public function after(){
- parent::after();
- //do some cleaning tasks
- }
- private function _execJobCommand($joburi,$paras){
- $php_exec = Kohana::config("picsou.php_exec");
- $php_index = APPINDEX;
- $command_args = array();
- $command_args[] = $php_index;
- $command_args[] = "--uri=".$joburi;
- foreach ($paras as $para => $value){
- $command_args[] = "--".$para."=".$value;
- }
- //var_dump($command_args);exit;
- echo "exec commmand:".$php_exec."\n";
- pcntl_exec($php_exec,$command_args);
- }
- /*
- * Running jobs in queues
- */
- public function action_run(){
- $requestCount = 0;
- while(true){
- $sql = "select * from job_queue where status='1' and approved='1' order by id";
- $jobs = DB::query(Database::SELECT,$sql)->execute()->as_array();
- if($jobs){
- foreach ($jobs as $job){
- $requestCount ++;
- //update the jobs status as running
- DB::update('job_queue')->set(array('status'=>'2'))
- ->where('id','=',$job['id'])->execute();
- $job_pid = pcntl_fork();
- if($job_pid == -1){
- die("Could not fork Child");
- } else if($job_pid == 0 ){
- $this->_execJobCommand($job['job_uri'],json_decode($job['paras'],true));
- echo "finish Child\n";
- exit(0);
- //run jobs here
- } else{
- echo "Waiting for job\n";
- ob_flush();
- $child_pid = pcntl_waitpid($job_pid,$status, WUNTRACED);
- echo "waitpid end:".$status."\n";
- if($status == 0){
- //job completed
- DB::update('job_queue')->set(array('status'=>'999'))
- ->where('id','=',$job['id'])->execute();
- echo "Child Finished\n";
- ob_flush();
- }else{
- DB::update('job_queue')->set(array('status'=>'-1'))
- ->where('id','=',$job['id'])->execute();
- echo "Child Failed\n";
- ob_flush();
- }
- }
- }
- }
- else{
- if($requestCount >=10){
- echo "Have a rest, I have processed 10 jobs\n";
- exit;
- }
- //no job to run
- //echo "No job\n";
- ob_flush();
- sleep(5);
- }
- }
- }
- }
复制代码
|