001/*
002 * Stallion Core: A Modern Web Framework
003 *
004 * Copyright (C) 2015 - 2016 Stallion Software LLC.
005 *
006 * This program is free software: you can redistribute it and/or modify it under the terms of the
007 * GNU General Public License as published by the Free Software Foundation, either version 2 of
008 * the License, or (at your option) any later version. This program is distributed in the hope that
009 * it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
010 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public
011 * License for more details. You should have received a copy of the GNU General Public License
012 * along with this program.  If not, see <http://www.gnu.org/licenses/gpl-2.0.html>.
013 *
014 *
015 *
016 */
017
018package io.stallion.jobs;
019
020import io.stallion.Context;
021import io.stallion.dataAccess.DataAccessRegistration;
022import io.stallion.dataAccess.NoStash;
023import io.stallion.dataAccess.StandardModelController;
024import io.stallion.dataAccess.db.DB;
025import io.stallion.dataAccess.db.DbPersister;
026import io.stallion.dataAccess.file.JsonFilePersister;
027import io.stallion.dataAccess.filtering.FilterOperator;
028import io.stallion.services.Log;
029import io.stallion.utils.DateUtils;
030import io.stallion.utils.GeneralUtils;
031
032import java.time.ZonedDateTime;
033import java.time.format.DateTimeFormatter;
034import java.util.*;
035
036import static io.stallion.utils.Literals.empty;
037import static io.stallion.utils.Literals.list;
038import static io.stallion.utils.Literals.utcNow;
039
040/**
041 * Handles the storing and retrieval of JobStatus records in either the database
042 * or file-system.
043 */
044public class JobStatusController extends StandardModelController<JobStatus> {
045    public DateTimeFormatter TIME_STAMP_FORMAT = DateTimeFormatter.ofPattern("YYYYMMddHHmm");
046
047    public static JobStatusController instance() {
048        return (JobStatusController)Context.dal().get("st_job_status");
049    }
050
051    public static void selfRegister()  {
052        DataAccessRegistration registration = new DataAccessRegistration()
053                .setControllerClass(JobStatusController.class)
054                .setModelClass(JobStatus.class)
055                .setPath("job-status")
056                .setBucket("st_job_status")
057                .setPersisterClass(JsonFilePersister.class)
058                .setShouldWatch(false)
059                .setUseDataFolder(true)
060                .setWritable(true);
061        if (DB.instance() != null) {
062            registration
063                    .setBucket("st_job_status")
064                    .setPersisterClass(DbPersister.class)
065                    .setPath("")
066                    .setTableName("stallion_job_status");
067        }
068        Context.dal().register(registration);
069
070    }
071
072    public List<JobStatus> findJobsForPeriod() {
073        return findJobsForPeriod(DateUtils.utcNow());
074    }
075    public List<JobStatus> findJobsForPeriod(ZonedDateTime now) {
076        List<JobStatus> returnJobs = list();
077        now = now.withSecond(0).withNano(0);
078        String nowStamp = TIME_STAMP_FORMAT.format(now);
079        List<JobStatus> allJobs;
080        if (!getPersister().isDbBacked()) {
081            allJobs = filterBy("nextExecuteMinuteStamp", nowStamp, FilterOperator.LESS_THAN_OR_EQUAL).all();
082
083        } else {
084            allJobs = DB.instance().query(JobStatus.class, "SELECT * FROM stallion_job_status WHERE nextexecuteminutestamp<=? AND lockedAt=0 AND lockGuid=''", nowStamp);
085        }
086        String firstJobName = "";
087        if (allJobs.size() > 0) {
088            firstJobName = ", first job name is " + allJobs.get(0).getName();
089        }
090        Log.fine("JobCoordinator: Found {0} jobs for period {1} {2}", allJobs.size(), nowStamp, firstJobName);
091        for(JobStatus job: allJobs) {
092            Log.fine("Job found for period {0} {1} {2}", job.getName(), nowStamp, job.getNextExecuteMinuteStamp());
093            JobDefinition definition = JobCoordinator.instance().getJobDefinition(job.getName());
094            if (definition == null) {
095                Log.warn("No job found for jobStatus with name {0}", job.getName());
096                continue;
097            }
098            if (empty(job.getNextExecuteMinuteStamp()) || empty(job.getNextExecuteAt())) {
099                Log.warn("Job nextExecute is empty {0}", job.getName());
100                resetNextRunTime(job, now.plusMinutes(1));
101                continue;
102            }
103            Long milsAgo = now.toInstant().toEpochMilli() - job.getNextExecuteAt().toInstant().toEpochMilli();
104            if (!empty(job.getLockedAt())) {
105                Log.info("Job is locked: {0} {1}", job.getName(), job.getLockedAt());
106                if ((now.toInstant().getEpochSecond() - job.getLockedAt()) > (2 * 60 * 60 * 1000)) {
107                    Log.warn("Job locked for more than 120 minutes! Resetting the lock.");
108                    resetLockAndNextRunAt(job, now.plusMinutes(1));
109                }
110                continue;
111            }
112            // If the job is more than five minutes stale, we recalculate it and save it
113            if (!nowStamp.equals(job.getNextExecuteMinuteStamp()) && milsAgo > (5 * 60 * 1000)) {
114                Log.warn("Job stamp is too far behind {0} {1}", job.getName(), job.getNextExecuteMinuteStamp());
115                resetNextRunTime(job, now.plusMinutes(1));
116            } else {
117                returnJobs.add(job);
118            }
119        }
120        return returnJobs;
121    }
122
123    public void resetLockAndNextRunAt(JobStatus jobStatus) {
124        resetLockAndNextRunAt(jobStatus, DateUtils.utcNow());
125    }
126
127    public void resetLockAndNextRunAt(JobStatus jobStatus, ZonedDateTime now) {
128        JobDefinition definition = JobCoordinator.instance().getJobDefinition(jobStatus.getName());
129        jobStatus.setLockedAt(0L);
130        jobStatus.setLockGuid("");
131        ZonedDateTime next = definition.getSchedule().nextAt(now);
132        jobStatus.setNextExecuteAt(next);
133        jobStatus.setNextExecuteMinuteStamp(TIME_STAMP_FORMAT.format(next));
134        Log.fine("Job {0} set to run next at {1}", jobStatus.getName(), jobStatus.getNextExecuteMinuteStamp());
135        save(jobStatus);
136    }
137
138    public void resetNextRunTime(JobStatus jobStatus) {
139        resetNextRunTime(jobStatus, DateUtils.utcNow());
140    }
141
142    public void resetNextRunTime(JobStatus jobStatus, ZonedDateTime now) {
143        JobDefinition definition = JobCoordinator.instance().getJobDefinition(jobStatus.getName());
144        ZonedDateTime next = definition.getSchedule().nextAt(now);
145        jobStatus.setNextExecuteAt(next);
146        jobStatus.setNextExecuteMinuteStamp(TIME_STAMP_FORMAT.format(next));
147        Log.fine("Job {0} set to run next at {1}", jobStatus.getName(), jobStatus.getNextExecuteMinuteStamp());
148        save(jobStatus);
149    }
150
151    public void initializeJobStatus(JobDefinition definition, ZonedDateTime now) {
152        JobStatus job = forUniqueKey("name", definition.getName());
153        if (job != null && job.getDeleted()) {
154            job.setDeleted(false);
155            job.setLockedAt(0L);
156            job.setLockGuid("");
157            job.setName(definition.getName());
158            job.setNextExecuteMinuteStamp("");
159            job.setNextExecuteAt(null);
160            save(job);
161        } else if (job == null) {
162            job = new JobStatus();
163            job.setName(definition.getName());
164            job.setId(Context.dal().getTickets().nextId());
165            save(job);
166        }
167        if (empty(job.getNextExecuteMinuteStamp())) {
168            resetNextRunTime(job, now);
169        }
170    }
171
172    private static Set<String> _uniqueFields = new HashSet<String>(Arrays.asList(new String[]{"name"}));
173
174    public boolean lockJob(String name) {
175        String lockId = UUID.randomUUID().toString();
176        Long lockedAt = DateUtils.mils();
177        if (!getPersister().isDbBacked()) {
178            JobStatus job = forUniqueKey("name", name);
179            if (empty(job.getLockGuid()) && empty(job.getLockedAt())) {
180                job.setLockedAt(lockedAt);
181                job.setLockGuid(lockId);
182                return true;
183            } else {
184                return false;
185            }
186        } else {
187            DB.instance().execute(
188                    "UPDATE stallion_job_status SET lockedAt=?, lockGuid=? WHERE lockedAt = 0 AND lockGuid='' AND name=?",
189                    lockedAt, lockId, name
190            );
191            Long count = DB.instance().queryScalar("SELECT COUNT(*) FROM stallion_job_status WHERE lockGuid=? AND name=?",
192                    lockId, name
193            );
194            if (count == 0) {
195                return false;
196            }
197            // Gotta do this so that the in memory version also has the lock
198            JobStatus job = forUniqueKey("name", name);
199            job.setLockGuid(lockId);
200            job.setLockedAt(lockedAt);
201            save(job);
202            return true;
203        }
204    }
205
206    public void unlockJob(String name) {
207        JobStatus job = forUniqueKey("name", name);
208        job.setLockGuid("");
209        job.setLockedAt(0L);
210        save(job);
211    }
212
213    public JobStatus getOrCreateForName(String jobName) {
214        JobStatus job = forUniqueKey("name", jobName);
215        if (job == null) {
216            job = new JobStatus();
217            job.setName(jobName);
218            job.setId(Context.dal().getTickets().nextId());
219            save(job);
220        }
221        return job;
222    }
223
224    @Override
225    public Set<String> getUniqueFields() {
226        return _uniqueFields;
227    }
228}