001/*
002 * Stallion Core: A Modern Web Framework
003 *
004 * Copyright (C) 2015 - 2016 Stallion Software LLC.
005 *
006 * This program is free software: you can redistribute it and/or modify it under the terms of the
007 * GNU General Public License as published by the Free Software Foundation, either version 2 of
008 * the License, or (at your option) any later version. This program is distributed in the hope that
009 * it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
010 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public
011 * License for more details. You should have received a copy of the GNU General Public License
012 * along with this program.  If not, see <http://www.gnu.org/licenses/gpl-2.0.html>.
013 *
014 *
015 *
016 */
017
018package io.stallion.asyncTasks;
019
020import io.stallion.dataAccess.db.DB;
021import io.stallion.dataAccess.db.DbPersister;
022import io.stallion.services.Log;
023import io.stallion.settings.Settings;
024import io.stallion.utils.DateUtils;
025import io.stallion.utils.GeneralUtils;
026import org.apache.commons.lang3.exception.ExceptionUtils;
027
028import java.util.UUID;
029
030import static io.stallion.utils.Literals.*;
031
032
033public class AsyncTaskDbPersister extends DbPersister<AsyncTask> implements AsyncTaskPersister {
034
035    public AsyncTask findAndLockNextTask(Long now) {
036        return findAndLockNextTask(now, 0);
037    }
038
039
040    public AsyncTask findAndLockNextTask(Long now, int depth) {
041        // Stop recursion at ten queries
042        if (depth > 10) {
043            return null;
044        }
045        String localMode = "";
046        if (Settings.instance().getLocalMode()) {
047            localMode = or(System.getenv("USER"), GeneralUtils.slugify(Settings.instance().getTargetFolder()));
048        }
049        // Do not execute tasks that are more than 2 days stale
050        Long minTime = now - 86400 * 2 * 1000;
051        AsyncTask task = DB.instance().queryForOne(
052                AsyncTask.class,
053                "SELECT * FROM stallion_async_tasks WHERE lockUuid='' AND executeAt<=? AND executeAt>? AND " +
054                        " completedAt=0 AND localMode=? ORDER BY executeAt ASC",
055                now, minTime, localMode);
056
057        if (task == null) {
058            return null;
059        }
060        String lockUuid = UUID.randomUUID().toString();
061        Long lockedAt = mils();
062        int affected = DB.instance().execute("UPDATE stallion_async_tasks SET lockedAt=?, lockUuid=? WHERE lockUuid='' AND id=?", lockedAt, lockUuid, task.getId());
063        if (affected == 0) {
064            // This will happen if another thread or process locks the row in between the SELECT and the UPDATE.
065            // If this happens, we just run the whole method again to get another row.
066            // We could have done a SELECT FOR UPDATE, but we want to minimize locking. So better to do optimisic
067            // locking rather than overly lock and create problems for MySQL
068            return findAndLockNextTask(now, depth + 1);
069        } else {
070            task.setLockedAt(lockedAt);
071            task.setLockUuid(lockUuid);
072            return task;
073        }
074    }
075
076    @Override
077    public boolean markFailed(AsyncTask task, Throwable e) {
078        Log.info("Mark task failed: id={0} handler={1} customKey={2}", task.getId(), task.getHandlerName(), task.getCustomKey());
079        task.setTryCount(task.getTryCount() + 1);
080        task.setErrorMessage(e.toString() + ExceptionUtils.getStackTrace(e));
081        if (task.getTryCount() >= 5) {
082            task.setFailedAt(DateUtils.mils());
083            Log.info("Mark task failed permanently: id={0} handler={1} customKey={2}", task.getId(), task.getHandlerName(), task.getCustomKey());
084        } else {
085            task.setExecuteAt(DateUtils.mils() + ((2^task.getTryCount())*1000));
086            task.setLockedAt(0);
087            task.setLockUuid("");
088        }
089        persist(task);
090        return true;
091    }
092
093    @Override
094    public boolean markComplete(AsyncTask task) {
095        Log.info("Mark task complete: id={0} handler={1} customKey={2}", task.getId(), task.getHandlerName(), task.getCustomKey());
096        task.setCompletedAt(mils());
097        persist(task);
098        return true;
099    }
100
101
102    public boolean lockForProcessing(AsyncTask task) {
103        return false;
104    }
105
106
107    @Override
108    public void deleteOldTasks() {
109        Long before = DateUtils.utcNow().minusDays(40).toInstant().toEpochMilli();
110        DB.instance().execute("DELETE FROM `stallion_async_tasks` WHERE completedAt > 0 AND completedAt<? ", before);
111    }
112
113}