001/*
002 * Stallion Core: A Modern Web Framework
003 *
004 * Copyright (C) 2015 - 2016 Stallion Software LLC.
005 *
006 * This program is free software: you can redistribute it and/or modify it under the terms of the
007 * GNU General Public License as published by the Free Software Foundation, either version 2 of
008 * the License, or (at your option) any later version. This program is distributed in the hope that
009 * it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
010 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public
011 * License for more details. You should have received a copy of the GNU General Public License
012 * along with this program.  If not, see <http://www.gnu.org/licenses/gpl-2.0.html>.
013 *
014 *
015 *
016 */
017
018package io.stallion.jobs;
019
020import io.stallion.exceptions.CommandException;
021import io.stallion.exceptions.ConfigException;
022import io.stallion.exceptions.UsageException;
023import io.stallion.services.Log;
024import io.stallion.settings.Settings;
025import org.apache.commons.lang3.concurrent.BasicThreadFactory;
026
027import static io.stallion.utils.Literals.*;
028
029import java.time.ZonedDateTime;
030import java.util.*;
031import java.util.concurrent.Executor;
032import java.util.concurrent.Executors;
033import java.util.concurrent.PriorityBlockingQueue;
034
035/**
036 * A service that handles running recurring jobs in the background.
037 */
038public class JobCoordinator extends Thread {
039
040    private static JobCoordinator _instance;
041
042    public static JobCoordinator instance() {
043        if (_instance == null) {
044            _instance = new JobCoordinator();
045            _instance.setName("stallion-job-coordinator");
046            //throw new ConfigException("You tried to access the JobCoordinator.instance(), but startUp() method was never called, jobs are not running.");
047        }
048        return _instance;
049    }
050
051    public static void startUp() {
052        instance().start();
053    }
054
055
056    /**
057     * Creates an instance, but does not start it.
058     * Used by unittests.
059     */
060    public static void initForTesting() {
061        if (_instance != null) {
062            throw new ConfigException("You cannot startup two job coordinators!");
063        }
064        _instance = new JobCoordinator();
065        _instance.synchronousMode = true;
066    }
067
068    public static void shutdown() {
069        if (_instance == null) {
070            return;
071        }
072        _instance.shouldShutDown = true;
073        if (_instance.isAlive()) {
074            _instance.interrupt();
075            try {
076                _instance.join();
077            } catch(InterruptedException e) {
078
079            }
080        }
081        _instance = null;
082    }
083
084    /* Instance methods     */
085    private JobCoordinator() {
086        queue = new PriorityBlockingQueue<>();
087        BasicThreadFactory factory = new BasicThreadFactory.Builder()
088                .namingPattern("stallion-job-execution-thread-%d")
089                .build();
090        // Create an executor service for single-threaded execution
091        pool = Executors.newFixedThreadPool(25, factory);
092        registeredJobs = new HashSet<>();
093    };
094
095    private Executor pool;
096    private boolean shouldShutDown = false;
097    private PriorityBlockingQueue<JobDefinition> queue;
098    private Set<String> registeredJobs;
099    private Map<String, Long> lastRanAtByJobName = new HashMap<>();
100    private Map<String, JobDefinition> jobByName = map();
101    private Boolean synchronousMode = false;
102
103    @Override
104    public void run() {
105
106        if (Settings.instance().getLocalMode() && ("prod".equals(Settings.instance().getEnv()) || "qa".equals(Settings.instance().getEnv()))) {
107            Log.info("Running localMode, environment is QA or PROD, thus not running jobs. Do not want to run production jobs locally!");
108            return;
109        }
110
111        while (!shouldShutDown) {
112            try {
113                executeJobsForCurrentTime(utcNow());
114            } catch(Exception e) {
115                Log.exception(e, "Error executing jobs in the main job coordinator loop!!!");
116            }
117
118
119            // Find the seconds until the next minute, sleep until 10 seconds into the next minute
120            ZonedDateTime now = utcNow();
121            ZonedDateTime nextMinute = now.withSecond(0).plusMinutes(1);
122            Long waitSeconds = (nextMinute.toInstant().getEpochSecond() - now.toInstant().getEpochSecond()) + 10;
123            try {
124                Thread.sleep(waitSeconds*1000);
125            } catch (InterruptedException e) {
126                break;
127            }
128
129        }
130
131        // Shut down the thread pool
132        // Wait for everything to exit
133
134
135    }
136
137    /**
138     * This really should not be a public method, but needs to be so unittests can access it.
139     * This is called by the main run() loop to actually find jobs to run and trigger their running
140     * @param now
141     * @throws ClassNotFoundException
142     * @throws IllegalAccessException
143     * @throws InstantiationException
144     */
145    public void executeJobsForCurrentTime(ZonedDateTime now) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
146        now = now.withSecond(0).withNano(0);
147        Log.finest("Checking for jobs to execute this period");
148        for (JobStatus jobStatus: JobStatusController.instance().findJobsForPeriod(now)) {
149            JobDefinition definition = JobCoordinator.instance().getJobDefinition(jobStatus.getName());
150            lastRanAtByJobName.put(definition.getName(), now.toInstant().toEpochMilli());
151            Log.info("Dispatching job {0}", definition.getName());
152            // Now start-up a thread to actually run the job
153            Class<? extends Job> jobClass = definition.getJobClass();
154            Job job = jobClass.newInstance();
155            JobInstanceDispatcher dispatcher = new JobInstanceDispatcher(jobStatus, definition, job, false, now);
156            if (synchronousMode) {
157                dispatcher.run();
158            } else {
159                pool.execute(dispatcher);
160            }
161
162        }
163    }
164
165    public void forceRunJob(String jobName, boolean forceEvenIfLocked) {
166        JobDefinition jobDefinition = jobByName.getOrDefault(jobName, null);
167        if (jobDefinition == null) {
168            throw new CommandException("Job not found: " + jobName);
169        }
170        Class<? extends Job> jobClass = jobDefinition.getJobClass();
171        Job job = null;
172        try {
173            job = jobClass.newInstance();
174        } catch (InstantiationException e) {
175            throw new RuntimeException(e);
176        } catch (IllegalAccessException e) {
177            throw new RuntimeException(e);
178        }
179        JobStatus status = JobStatusController.instance().forUniqueKey("name", jobName);
180        JobInstanceDispatcher dispatcher = new JobInstanceDispatcher(status, jobDefinition, job, forceEvenIfLocked, utcNow());
181
182        dispatcher.run();
183    }
184
185    /**
186     * Register the given job definition to be run as a recurring class
187     *
188     * @param job
189     */
190    public void registerJob(JobDefinition job) {
191        doRegisterJob(job, utcNow());
192    }
193
194    /**
195     * Called by unittests to load the job, with dateTime passed in
196     * to override the current time
197     * @param job
198     * @param now
199     */
200    public void registerJobForTest(JobDefinition job, ZonedDateTime now) {
201        doRegisterJob(job, now);
202    }
203
204
205    private void doRegisterJob(JobDefinition job, ZonedDateTime now) {
206        if (shouldShutDown) {
207            Log.warn("Tried to add a job while the JobCoordinator was shutting down.");
208            return;
209        }
210        // Verify the job class exists and extends IJob
211        //Class cls;
212        //try {
213        //    cls = getClass().getClassLoader().loadClass(job.getJobClassName());
214        //} catch (ClassNotFoundException e) {
215        //    throw new RuntimeException("Could not find the job class: " + job.getJobClassName(), e);
216        //}
217        //if (!IJob.class.isAssignableFrom(cls)) {
218        //    throw new UsageException("Job class " + job.getJobClassName() + " does not implement interface IJob");
219        //}
220        if (job.getJobClass() == null) {
221            throw new UsageException("Missing an IJob class");
222        }
223        if (empty(job.getName())) {
224            job.setName(job.getJobClassName());
225        }
226        if (registeredJobs.contains(job.getName())) {
227            throw new ConfigException("You tried to load the same job twice! If you want to load multiple jobs with the same class, be sure to set the 'name' field of the job definition. Job name: " + job.getName());
228        }
229
230
231
232
233        registeredJobs.add(job.getName());
234        jobByName.put(job.getName(), job);
235
236        JobStatusController.instance().initializeJobStatus(job, now);
237    }
238
239
240    public JobDefinition getJobDefinition(String name) {
241        return jobByName.getOrDefault(name, null);
242    }
243
244    /**
245     * Get a list of the health of all jobs.
246     * @return
247     */
248    public List<JobHealthInfo> buildJobHealthInfos() {
249        List<JobHealthInfo> infos = list();
250        for(JobDefinition job: jobByName.values()) {
251            if (job == null || empty(job.getName())) {
252                continue;
253            }
254            JobHealthInfo health = new JobHealthInfo();
255            infos.add(health);
256            JobStatus status = JobStatusController.instance().getOrCreateForName(job.getName());
257            // TODO match the names on all these fields;
258            health.setRunningNow(status.getStartedAt() > status.getCompletedAt() && status.getStartedAt() > status.getFailedAt());
259            health.setError(status.getError());
260            health.setJobName(job.getName());
261            health.setLastFailedAt(status.getFailedAt());
262            health.setLastFinishedAt(status.getCompletedAt());
263            health.setLastRunSucceeded(status.getFailedAt()==0);
264            health.setExpectCompleteBy(status.getShouldSucceedBy());
265            health.setLastFinishedAt(status.getCompletedAt());
266            health.setLastRunTime(status.getLastDurationSeconds());
267            health.setFailCount(status.getFailCount());
268            health.setNextExecuteMinuteStamp(status.getNextExecuteMinuteStamp());
269        }
270        return infos;
271    }
272
273}