001/* 002 * Stallion Core: A Modern Web Framework 003 * 004 * Copyright (C) 2015 - 2016 Stallion Software LLC. 005 * 006 * This program is free software: you can redistribute it and/or modify it under the terms of the 007 * GNU General Public License as published by the Free Software Foundation, either version 2 of 008 * the License, or (at your option) any later version. This program is distributed in the hope that 009 * it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of 010 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public 011 * License for more details. You should have received a copy of the GNU General Public License 012 * along with this program. If not, see <http://www.gnu.org/licenses/gpl-2.0.html>. 013 * 014 * 015 * 016 */ 017 018package io.stallion.jobs; 019 020import io.stallion.Context; 021import io.stallion.dataAccess.DataAccessRegistration; 022import io.stallion.dataAccess.NoStash; 023import io.stallion.dataAccess.StandardModelController; 024import io.stallion.dataAccess.db.DB; 025import io.stallion.dataAccess.db.DbPersister; 026import io.stallion.dataAccess.file.JsonFilePersister; 027import io.stallion.dataAccess.filtering.FilterOperator; 028import io.stallion.services.Log; 029import io.stallion.utils.DateUtils; 030import io.stallion.utils.GeneralUtils; 031 032import java.time.ZonedDateTime; 033import java.time.format.DateTimeFormatter; 034import java.util.*; 035 036import static io.stallion.utils.Literals.empty; 037import static io.stallion.utils.Literals.list; 038import static io.stallion.utils.Literals.utcNow; 039 040/** 041 * Handles the storing and retrieval of JobStatus records in either the database 042 * or file-system. 043 */ 044public class JobStatusController extends StandardModelController<JobStatus> { 045 public DateTimeFormatter TIME_STAMP_FORMAT = DateTimeFormatter.ofPattern("YYYYMMddHHmm"); 046 047 public static JobStatusController instance() { 048 return (JobStatusController)Context.dal().get("st_job_status"); 049 } 050 051 public static void selfRegister() { 052 DataAccessRegistration registration = new DataAccessRegistration() 053 .setControllerClass(JobStatusController.class) 054 .setModelClass(JobStatus.class) 055 .setPath("job-status") 056 .setBucket("st_job_status") 057 .setPersisterClass(JsonFilePersister.class) 058 .setShouldWatch(false) 059 .setUseDataFolder(true) 060 .setWritable(true); 061 if (DB.instance() != null) { 062 registration 063 .setBucket("st_job_status") 064 .setPersisterClass(DbPersister.class) 065 .setPath("") 066 .setTableName("stallion_job_status"); 067 } 068 Context.dal().register(registration); 069 070 } 071 072 public List<JobStatus> findJobsForPeriod() { 073 return findJobsForPeriod(DateUtils.utcNow()); 074 } 075 public List<JobStatus> findJobsForPeriod(ZonedDateTime now) { 076 List<JobStatus> returnJobs = list(); 077 now = now.withSecond(0).withNano(0); 078 String nowStamp = TIME_STAMP_FORMAT.format(now); 079 List<JobStatus> allJobs; 080 if (!getPersister().isDbBacked()) { 081 allJobs = filterBy("nextExecuteMinuteStamp", nowStamp, FilterOperator.LESS_THAN_OR_EQUAL).all(); 082 083 } else { 084 allJobs = DB.instance().query(JobStatus.class, "SELECT * FROM stallion_job_status WHERE nextexecuteminutestamp<=? AND lockedAt=0 AND lockGuid=''", nowStamp); 085 } 086 String firstJobName = ""; 087 if (allJobs.size() > 0) { 088 firstJobName = ", first job name is " + allJobs.get(0).getName(); 089 } 090 Log.fine("JobCoordinator: Found {0} jobs for period {1} {2}", allJobs.size(), nowStamp, firstJobName); 091 for(JobStatus job: allJobs) { 092 Log.fine("Job found for period {0} {1} {2}", job.getName(), nowStamp, job.getNextExecuteMinuteStamp()); 093 JobDefinition definition = JobCoordinator.instance().getJobDefinition(job.getName()); 094 if (definition == null) { 095 Log.warn("No job found for jobStatus with name {0}", job.getName()); 096 continue; 097 } 098 if (empty(job.getNextExecuteMinuteStamp()) || empty(job.getNextExecuteAt())) { 099 Log.warn("Job nextExecute is empty {0}", job.getName()); 100 resetNextRunTime(job, now.plusMinutes(1)); 101 continue; 102 } 103 Long milsAgo = now.toInstant().toEpochMilli() - job.getNextExecuteAt().toInstant().toEpochMilli(); 104 if (!empty(job.getLockedAt())) { 105 Log.info("Job is locked: {0} {1}", job.getName(), job.getLockedAt()); 106 if ((now.toInstant().getEpochSecond() - job.getLockedAt()) > (2 * 60 * 60 * 1000)) { 107 Log.warn("Job locked for more than 120 minutes! Resetting the lock."); 108 resetLockAndNextRunAt(job, now.plusMinutes(1)); 109 } 110 continue; 111 } 112 // If the job is more than five minutes stale, we recalculate it and save it 113 if (!nowStamp.equals(job.getNextExecuteMinuteStamp()) && milsAgo > (5 * 60 * 1000)) { 114 Log.warn("Job stamp is too far behind {0} {1}", job.getName(), job.getNextExecuteMinuteStamp()); 115 resetNextRunTime(job, now.plusMinutes(1)); 116 } else { 117 returnJobs.add(job); 118 } 119 } 120 return returnJobs; 121 } 122 123 public void resetLockAndNextRunAt(JobStatus jobStatus) { 124 resetLockAndNextRunAt(jobStatus, DateUtils.utcNow()); 125 } 126 127 public void resetLockAndNextRunAt(JobStatus jobStatus, ZonedDateTime now) { 128 JobDefinition definition = JobCoordinator.instance().getJobDefinition(jobStatus.getName()); 129 jobStatus.setLockedAt(0L); 130 jobStatus.setLockGuid(""); 131 ZonedDateTime next = definition.getSchedule().nextAt(now); 132 jobStatus.setNextExecuteAt(next); 133 jobStatus.setNextExecuteMinuteStamp(TIME_STAMP_FORMAT.format(next)); 134 Log.fine("Job {0} set to run next at {1}", jobStatus.getName(), jobStatus.getNextExecuteMinuteStamp()); 135 save(jobStatus); 136 } 137 138 public void resetNextRunTime(JobStatus jobStatus) { 139 resetNextRunTime(jobStatus, DateUtils.utcNow()); 140 } 141 142 public void resetNextRunTime(JobStatus jobStatus, ZonedDateTime now) { 143 JobDefinition definition = JobCoordinator.instance().getJobDefinition(jobStatus.getName()); 144 ZonedDateTime next = definition.getSchedule().nextAt(now); 145 jobStatus.setNextExecuteAt(next); 146 jobStatus.setNextExecuteMinuteStamp(TIME_STAMP_FORMAT.format(next)); 147 Log.fine("Job {0} set to run next at {1}", jobStatus.getName(), jobStatus.getNextExecuteMinuteStamp()); 148 save(jobStatus); 149 } 150 151 public void initializeJobStatus(JobDefinition definition, ZonedDateTime now) { 152 JobStatus job = forUniqueKey("name", definition.getName()); 153 if (job != null && job.getDeleted()) { 154 job.setDeleted(false); 155 job.setLockedAt(0L); 156 job.setLockGuid(""); 157 job.setName(definition.getName()); 158 job.setNextExecuteMinuteStamp(""); 159 job.setNextExecuteAt(null); 160 save(job); 161 } else if (job == null) { 162 job = new JobStatus(); 163 job.setName(definition.getName()); 164 job.setId(Context.dal().getTickets().nextId()); 165 save(job); 166 } 167 if (empty(job.getNextExecuteMinuteStamp())) { 168 resetNextRunTime(job, now); 169 } 170 } 171 172 private static Set<String> _uniqueFields = new HashSet<String>(Arrays.asList(new String[]{"name"})); 173 174 public boolean lockJob(String name) { 175 String lockId = UUID.randomUUID().toString(); 176 Long lockedAt = DateUtils.mils(); 177 if (!getPersister().isDbBacked()) { 178 JobStatus job = forUniqueKey("name", name); 179 if (empty(job.getLockGuid()) && empty(job.getLockedAt())) { 180 job.setLockedAt(lockedAt); 181 job.setLockGuid(lockId); 182 return true; 183 } else { 184 return false; 185 } 186 } else { 187 DB.instance().execute( 188 "UPDATE stallion_job_status SET lockedAt=?, lockGuid=? WHERE lockedAt = 0 AND lockGuid='' AND name=?", 189 lockedAt, lockId, name 190 ); 191 Long count = DB.instance().queryScalar("SELECT COUNT(*) FROM stallion_job_status WHERE lockGuid=? AND name=?", 192 lockId, name 193 ); 194 if (count == 0) { 195 return false; 196 } 197 // Gotta do this so that the in memory version also has the lock 198 JobStatus job = forUniqueKey("name", name); 199 job.setLockGuid(lockId); 200 job.setLockedAt(lockedAt); 201 save(job); 202 return true; 203 } 204 } 205 206 public void unlockJob(String name) { 207 JobStatus job = forUniqueKey("name", name); 208 job.setLockGuid(""); 209 job.setLockedAt(0L); 210 save(job); 211 } 212 213 public JobStatus getOrCreateForName(String jobName) { 214 JobStatus job = forUniqueKey("name", jobName); 215 if (job == null) { 216 job = new JobStatus(); 217 job.setName(jobName); 218 job.setId(Context.dal().getTickets().nextId()); 219 save(job); 220 } 221 return job; 222 } 223 224 @Override 225 public Set<String> getUniqueFields() { 226 return _uniqueFields; 227 } 228}