001/* 002 * Stallion Core: A Modern Web Framework 003 * 004 * Copyright (C) 2015 - 2016 Stallion Software LLC. 005 * 006 * This program is free software: you can redistribute it and/or modify it under the terms of the 007 * GNU General Public License as published by the Free Software Foundation, either version 2 of 008 * the License, or (at your option) any later version. This program is distributed in the hope that 009 * it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of 010 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public 011 * License for more details. You should have received a copy of the GNU General Public License 012 * along with this program. If not, see <http://www.gnu.org/licenses/gpl-2.0.html>. 013 * 014 * 015 * 016 */ 017 018package io.stallion.jobs; 019 020import io.stallion.exceptions.CommandException; 021import io.stallion.exceptions.ConfigException; 022import io.stallion.exceptions.UsageException; 023import io.stallion.services.Log; 024import io.stallion.settings.Settings; 025import org.apache.commons.lang3.concurrent.BasicThreadFactory; 026 027import static io.stallion.utils.Literals.*; 028 029import java.time.ZonedDateTime; 030import java.util.*; 031import java.util.concurrent.Executor; 032import java.util.concurrent.Executors; 033import java.util.concurrent.PriorityBlockingQueue; 034 035/** 036 * A service that handles running recurring jobs in the background. 037 */ 038public class JobCoordinator extends Thread { 039 040 private static JobCoordinator _instance; 041 042 public static JobCoordinator instance() { 043 if (_instance == null) { 044 _instance = new JobCoordinator(); 045 _instance.setName("stallion-job-coordinator"); 046 //throw new ConfigException("You tried to access the JobCoordinator.instance(), but startUp() method was never called, jobs are not running."); 047 } 048 return _instance; 049 } 050 051 public static void startUp() { 052 instance().start(); 053 } 054 055 056 /** 057 * Creates an instance, but does not start it. 058 * Used by unittests. 059 */ 060 public static void initForTesting() { 061 if (_instance != null) { 062 throw new ConfigException("You cannot startup two job coordinators!"); 063 } 064 _instance = new JobCoordinator(); 065 _instance.synchronousMode = true; 066 } 067 068 public static void shutdown() { 069 if (_instance == null) { 070 return; 071 } 072 _instance.shouldShutDown = true; 073 if (_instance.isAlive()) { 074 _instance.interrupt(); 075 try { 076 _instance.join(); 077 } catch(InterruptedException e) { 078 079 } 080 } 081 _instance = null; 082 } 083 084 /* Instance methods */ 085 private JobCoordinator() { 086 queue = new PriorityBlockingQueue<>(); 087 BasicThreadFactory factory = new BasicThreadFactory.Builder() 088 .namingPattern("stallion-job-execution-thread-%d") 089 .build(); 090 // Create an executor service for single-threaded execution 091 pool = Executors.newFixedThreadPool(25, factory); 092 registeredJobs = new HashSet<>(); 093 }; 094 095 private Executor pool; 096 private boolean shouldShutDown = false; 097 private PriorityBlockingQueue<JobDefinition> queue; 098 private Set<String> registeredJobs; 099 private Map<String, Long> lastRanAtByJobName = new HashMap<>(); 100 private Map<String, JobDefinition> jobByName = map(); 101 private Boolean synchronousMode = false; 102 103 @Override 104 public void run() { 105 106 if (Settings.instance().getLocalMode() && ("prod".equals(Settings.instance().getEnv()) || "qa".equals(Settings.instance().getEnv()))) { 107 Log.info("Running localMode, environment is QA or PROD, thus not running jobs. Do not want to run production jobs locally!"); 108 return; 109 } 110 111 while (!shouldShutDown) { 112 try { 113 executeJobsForCurrentTime(utcNow()); 114 } catch(Exception e) { 115 Log.exception(e, "Error executing jobs in the main job coordinator loop!!!"); 116 } 117 118 119 // Find the seconds until the next minute, sleep until 10 seconds into the next minute 120 ZonedDateTime now = utcNow(); 121 ZonedDateTime nextMinute = now.withSecond(0).plusMinutes(1); 122 Long waitSeconds = (nextMinute.toInstant().getEpochSecond() - now.toInstant().getEpochSecond()) + 10; 123 try { 124 Thread.sleep(waitSeconds*1000); 125 } catch (InterruptedException e) { 126 break; 127 } 128 129 } 130 131 // Shut down the thread pool 132 // Wait for everything to exit 133 134 135 } 136 137 /** 138 * This really should not be a public method, but needs to be so unittests can access it. 139 * This is called by the main run() loop to actually find jobs to run and trigger their running 140 * @param now 141 * @throws ClassNotFoundException 142 * @throws IllegalAccessException 143 * @throws InstantiationException 144 */ 145 public void executeJobsForCurrentTime(ZonedDateTime now) throws ClassNotFoundException, IllegalAccessException, InstantiationException { 146 now = now.withSecond(0).withNano(0); 147 Log.finest("Checking for jobs to execute this period"); 148 for (JobStatus jobStatus: JobStatusController.instance().findJobsForPeriod(now)) { 149 JobDefinition definition = JobCoordinator.instance().getJobDefinition(jobStatus.getName()); 150 lastRanAtByJobName.put(definition.getName(), now.toInstant().toEpochMilli()); 151 Log.info("Dispatching job {0}", definition.getName()); 152 // Now start-up a thread to actually run the job 153 Class<? extends Job> jobClass = definition.getJobClass(); 154 Job job = jobClass.newInstance(); 155 JobInstanceDispatcher dispatcher = new JobInstanceDispatcher(jobStatus, definition, job, false, now); 156 if (synchronousMode) { 157 dispatcher.run(); 158 } else { 159 pool.execute(dispatcher); 160 } 161 162 } 163 } 164 165 public void forceRunJob(String jobName, boolean forceEvenIfLocked) { 166 JobDefinition jobDefinition = jobByName.getOrDefault(jobName, null); 167 if (jobDefinition == null) { 168 throw new CommandException("Job not found: " + jobName); 169 } 170 Class<? extends Job> jobClass = jobDefinition.getJobClass(); 171 Job job = null; 172 try { 173 job = jobClass.newInstance(); 174 } catch (InstantiationException e) { 175 throw new RuntimeException(e); 176 } catch (IllegalAccessException e) { 177 throw new RuntimeException(e); 178 } 179 JobStatus status = JobStatusController.instance().forUniqueKey("name", jobName); 180 JobInstanceDispatcher dispatcher = new JobInstanceDispatcher(status, jobDefinition, job, forceEvenIfLocked, utcNow()); 181 182 dispatcher.run(); 183 } 184 185 /** 186 * Register the given job definition to be run as a recurring class 187 * 188 * @param job 189 */ 190 public void registerJob(JobDefinition job) { 191 doRegisterJob(job, utcNow()); 192 } 193 194 /** 195 * Called by unittests to load the job, with dateTime passed in 196 * to override the current time 197 * @param job 198 * @param now 199 */ 200 public void registerJobForTest(JobDefinition job, ZonedDateTime now) { 201 doRegisterJob(job, now); 202 } 203 204 205 private void doRegisterJob(JobDefinition job, ZonedDateTime now) { 206 if (shouldShutDown) { 207 Log.warn("Tried to add a job while the JobCoordinator was shutting down."); 208 return; 209 } 210 // Verify the job class exists and extends IJob 211 //Class cls; 212 //try { 213 // cls = getClass().getClassLoader().loadClass(job.getJobClassName()); 214 //} catch (ClassNotFoundException e) { 215 // throw new RuntimeException("Could not find the job class: " + job.getJobClassName(), e); 216 //} 217 //if (!IJob.class.isAssignableFrom(cls)) { 218 // throw new UsageException("Job class " + job.getJobClassName() + " does not implement interface IJob"); 219 //} 220 if (job.getJobClass() == null) { 221 throw new UsageException("Missing an IJob class"); 222 } 223 if (empty(job.getName())) { 224 job.setName(job.getJobClassName()); 225 } 226 if (registeredJobs.contains(job.getName())) { 227 throw new ConfigException("You tried to load the same job twice! If you want to load multiple jobs with the same class, be sure to set the 'name' field of the job definition. Job name: " + job.getName()); 228 } 229 230 231 232 233 registeredJobs.add(job.getName()); 234 jobByName.put(job.getName(), job); 235 236 JobStatusController.instance().initializeJobStatus(job, now); 237 } 238 239 240 public JobDefinition getJobDefinition(String name) { 241 return jobByName.getOrDefault(name, null); 242 } 243 244 /** 245 * Get a list of the health of all jobs. 246 * @return 247 */ 248 public List<JobHealthInfo> buildJobHealthInfos() { 249 List<JobHealthInfo> infos = list(); 250 for(JobDefinition job: jobByName.values()) { 251 if (job == null || empty(job.getName())) { 252 continue; 253 } 254 JobHealthInfo health = new JobHealthInfo(); 255 infos.add(health); 256 JobStatus status = JobStatusController.instance().getOrCreateForName(job.getName()); 257 // TODO match the names on all these fields; 258 health.setRunningNow(status.getStartedAt() > status.getCompletedAt() && status.getStartedAt() > status.getFailedAt()); 259 health.setError(status.getError()); 260 health.setJobName(job.getName()); 261 health.setLastFailedAt(status.getFailedAt()); 262 health.setLastFinishedAt(status.getCompletedAt()); 263 health.setLastRunSucceeded(status.getFailedAt()==0); 264 health.setExpectCompleteBy(status.getShouldSucceedBy()); 265 health.setLastFinishedAt(status.getCompletedAt()); 266 health.setLastRunTime(status.getLastDurationSeconds()); 267 health.setFailCount(status.getFailCount()); 268 health.setNextExecuteMinuteStamp(status.getNextExecuteMinuteStamp()); 269 } 270 return infos; 271 } 272 273}