001/* 002 * Stallion Core: A Modern Web Framework 003 * 004 * Copyright (C) 2015 - 2016 Stallion Software LLC. 005 * 006 * This program is free software: you can redistribute it and/or modify it under the terms of the 007 * GNU General Public License as published by the Free Software Foundation, either version 2 of 008 * the License, or (at your option) any later version. This program is distributed in the hope that 009 * it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of 010 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public 011 * License for more details. You should have received a copy of the GNU General Public License 012 * along with this program. If not, see <http://www.gnu.org/licenses/gpl-2.0.html>. 013 * 014 * 015 * 016 */ 017 018package io.stallion.asyncTasks; 019 020import io.stallion.dataAccess.db.DB; 021import io.stallion.dataAccess.db.DbPersister; 022import io.stallion.services.Log; 023import io.stallion.settings.Settings; 024import io.stallion.utils.DateUtils; 025import io.stallion.utils.GeneralUtils; 026import org.apache.commons.lang3.exception.ExceptionUtils; 027 028import java.util.UUID; 029 030import static io.stallion.utils.Literals.*; 031 032 033public class AsyncTaskDbPersister extends DbPersister<AsyncTask> implements AsyncTaskPersister { 034 035 public AsyncTask findAndLockNextTask(Long now) { 036 return findAndLockNextTask(now, 0); 037 } 038 039 040 public AsyncTask findAndLockNextTask(Long now, int depth) { 041 // Stop recursion at ten queries 042 if (depth > 10) { 043 return null; 044 } 045 String localMode = ""; 046 if (Settings.instance().getLocalMode()) { 047 localMode = or(System.getenv("USER"), GeneralUtils.slugify(Settings.instance().getTargetFolder())); 048 } 049 // Do not execute tasks that are more than 2 days stale 050 Long minTime = now - 86400 * 2 * 1000; 051 AsyncTask task = DB.instance().queryForOne( 052 AsyncTask.class, 053 "SELECT * FROM stallion_async_tasks WHERE lockUuid='' AND executeAt<=? AND executeAt>? AND " + 054 " completedAt=0 AND localMode=? ORDER BY executeAt ASC", 055 now, minTime, localMode); 056 057 if (task == null) { 058 return null; 059 } 060 String lockUuid = UUID.randomUUID().toString(); 061 Long lockedAt = mils(); 062 int affected = DB.instance().execute("UPDATE stallion_async_tasks SET lockedAt=?, lockUuid=? WHERE lockUuid='' AND id=?", lockedAt, lockUuid, task.getId()); 063 if (affected == 0) { 064 // This will happen if another thread or process locks the row in between the SELECT and the UPDATE. 065 // If this happens, we just run the whole method again to get another row. 066 // We could have done a SELECT FOR UPDATE, but we want to minimize locking. So better to do optimisic 067 // locking rather than overly lock and create problems for MySQL 068 return findAndLockNextTask(now, depth + 1); 069 } else { 070 task.setLockedAt(lockedAt); 071 task.setLockUuid(lockUuid); 072 return task; 073 } 074 } 075 076 @Override 077 public boolean markFailed(AsyncTask task, Throwable e) { 078 Log.info("Mark task failed: id={0} handler={1} customKey={2}", task.getId(), task.getHandlerName(), task.getCustomKey()); 079 task.setTryCount(task.getTryCount() + 1); 080 task.setErrorMessage(e.toString() + ExceptionUtils.getStackTrace(e)); 081 if (task.getTryCount() >= 5) { 082 task.setFailedAt(DateUtils.mils()); 083 Log.info("Mark task failed permanently: id={0} handler={1} customKey={2}", task.getId(), task.getHandlerName(), task.getCustomKey()); 084 } else { 085 task.setExecuteAt(DateUtils.mils() + ((2^task.getTryCount())*1000)); 086 task.setLockedAt(0); 087 task.setLockUuid(""); 088 } 089 persist(task); 090 return true; 091 } 092 093 @Override 094 public boolean markComplete(AsyncTask task) { 095 Log.info("Mark task complete: id={0} handler={1} customKey={2}", task.getId(), task.getHandlerName(), task.getCustomKey()); 096 task.setCompletedAt(mils()); 097 persist(task); 098 return true; 099 } 100 101 102 public boolean lockForProcessing(AsyncTask task) { 103 return false; 104 } 105 106 107 @Override 108 public void deleteOldTasks() { 109 Long before = DateUtils.utcNow().minusDays(40).toInstant().toEpochMilli(); 110 DB.instance().execute("DELETE FROM `stallion_async_tasks` WHERE completedAt > 0 AND completedAt<? ", before); 111 } 112 113}