001/* 002 * Stallion Core: A Modern Web Framework 003 * 004 * Copyright (C) 2015 - 2016 Stallion Software LLC. 005 * 006 * This program is free software: you can redistribute it and/or modify it under the terms of the 007 * GNU General Public License as published by the Free Software Foundation, either version 2 of 008 * the License, or (at your option) any later version. This program is distributed in the hope that 009 * it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of 010 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public 011 * License for more details. You should have received a copy of the GNU General Public License 012 * along with this program. If not, see <http://www.gnu.org/licenses/gpl-2.0.html>. 013 * 014 * 015 * 016 */ 017 018package io.stallion.asyncTasks; 019 020import io.stallion.Context; 021import io.stallion.exceptions.AppException; 022import io.stallion.services.Log; 023import io.stallion.utils.DateUtils; 024import org.apache.commons.lang3.StringUtils; 025 026import java.util.HashSet; 027import java.util.concurrent.PriorityBlockingQueue; 028import java.util.concurrent.atomic.AtomicLong; 029 030import static io.stallion.utils.Literals.*; 031 032 033public class AsyncFileCoordinator extends AsyncCoordinator { 034 035 private AtomicLong counter = new AtomicLong(1); 036 private PriorityBlockingQueue<AsyncTask> taskQueue = new PriorityBlockingQueue<>(); 037 private HashSet<Object> seenTaskIds = new HashSet<>(); 038 039 040 041 042 @Override 043 public AsyncTask enqueue(AsyncTaskHandler handler) { 044 return enqueue(handler, null, 0); 045 } 046 047 @Override 048 public AsyncTask enqueue(AsyncTaskHandler handler, String customKey, long executeAt) { 049 AsyncTask task = new AsyncTask(handler, customKey, executeAt); 050 enqueue(task); 051 return task; 052 } 053 054 @Override 055 public void enqueue(AsyncTask task) { 056 if (StringUtils.isEmpty(task.getHandlerName())) { 057 throw new AppException("A task was enqueued, but getHandlerName() was blank"); 058 } 059 060 //TODO: If the task handler is not registered, then add it 061 //TODO: If it does not exist, raise a ConfigException 062 063 if (task.getOriginallyScheduledFor() > 0) { 064 task.setExecuteAt(task.getOriginallyScheduledFor()); 065 } else if (!empty(task.getExecuteAt()) && empty(task.getOriginallyScheduledFor())) { 066 task.setOriginallyScheduledFor(task.getExecuteAt()); 067 } 068 if (!StringUtils.isEmpty(task.getCustomKey())) { 069 try { 070 AsyncTask existing = AsyncTaskController.instance().forUniqueKey("customKey", task.getCustomKey()); 071 if (existing != null && existing.getCompletedAt() > 0) { 072 Log.info("Existing task already ran with customKey={0}", task.getCustomKey()); 073 return; 074 } else if (existing != null) { 075 existing.setDataJson(task.getDataJson()); 076 Boolean executeAtChanged = false; 077 if (task.getExecuteAt() > 0 && existing.getTryCount() == 0 && existing.getExecuteAt() != task.getExecuteAt()) { 078 existing.setExecuteAt(task.getExecuteAt()); 079 existing.setOriginallyScheduledFor(task.getOriginallyScheduledFor()); 080 executeAtChanged = true; 081 } 082 existing.setHandlerName(task.getHandlerName()); 083 updateTask(existing, executeAtChanged); 084 Log.info("Updating existing task with customKey={0}", task.getCustomKey()); 085 return; 086 } 087 } catch (Exception e) { 088 throw new RuntimeException(e); 089 } 090 } 091 if (task.getExecuteAt() == 0) { 092 // Set to the past, so will execute immediately even if picked up by another server with a slower clock 093 task.setExecuteAt(DateUtils.mils() - 15000); 094 } 095 String[] parts = task.getHandlerName().split("\\."); 096 String simpleName = parts[parts.length-1]; 097 Long i = counter.getAndIncrement(); 098 if (task.getId() == null || StringUtils.isEmpty(task.getId().toString())) { 099 String keyPart = empty(task.getCustomKey()) ? "" : "-" + task.getCustomKey(); 100 task.setId(Context.dal().getTickets().nextId()); 101 } 102 saveNewTask(task); 103 if (AsyncCoordinator.instance() != null && AsyncCoordinator.instance().isSynchronousMode()) { 104 AsyncCoordinator.instance().executeNext(); 105 } 106 } 107 108 @Override 109 public void updateTask(AsyncTask task, boolean executeAtChanged) { 110 AsyncTaskController.instance().save(task); 111 if (executeAtChanged) { 112 // Need to remove and add back in order to re-sort 113 // This could get expensive if the Queue is big 114 taskQueue.remove(task); 115 taskQueue.add(task); 116 } 117 } 118 119 @Override 120 public void saveNewTask(AsyncTask task) { 121 AsyncTaskController.instance().save(task); 122 Log.info("Adding task to the queue: id={0} handler={1} customKey={2}", task.getId(), task.getHandlerName(), task.getCustomKey()); 123 taskQueue.add(task); 124 125 } 126 127 @Override 128 public AsyncTask findAndLockNextTask(Long now) { 129 now = or(now, DateUtils.mils()); 130 if (getTaskQueue().size() == 0) { 131 return null; 132 } 133 AsyncTask task = getTaskQueue().poll(); 134 if (task == null) { 135 return null; 136 } 137 if (task.getExecuteAt() > now) { 138 getTaskQueue().put(task); 139 return null; 140 } 141 if (task.getLockedAt() > 0) { 142 // Locked by someone else, carry on 143 Log.warn("Task is already locked {0} {1} currentThread={2} lockUid={3}", task.getId(), task.getLockedAt(), Thread.currentThread().getId(), task.getLockUuid()); 144 return null; 145 } 146 147 148 Log.finer("Queue size is {0}", getTaskQueue().size()); 149 boolean locked = lockTaskForExecution(task); 150 if (!locked) { 151 Log.warn("Unable to lock task! {0}", task.getId()); 152 return null; 153 } 154 return task; 155 } 156 157 158 protected boolean lockTaskForExecution(AsyncTask task) { 159 if (seenTaskIds.contains(task.getId())) { 160 Log.warn("Trying to lock a task with an id that has already been seen! {0}", task.getId()); 161 return false; 162 } 163 seenTaskIds.add(task.getId()); 164 return getTaskPersister().lockForProcessing(task); 165 } 166 167 @Override 168 public boolean markCompleted(AsyncTask task) { 169 return getTaskPersister().markComplete(task); 170 } 171 172 @Override 173 public boolean markFailed(AsyncTask task, Throwable throwable) { 174 if (seenTaskIds.contains(task.getId())) { 175 seenTaskIds.remove(task.getId()); 176 } 177 return getTaskPersister().markFailed(task, throwable); 178 } 179 180 @Override 181 public AsyncTaskPersister getTaskPersister() { 182 return (AsyncTaskPersister) AsyncTaskController.instance().getPersister(); 183 } 184 185 public boolean hasSeenTask(String id) { 186 return seenTaskIds.contains(id); 187 } 188 189 public boolean hasTaskWithId(Long id) { 190 AsyncTask mirror = new AsyncTask(); 191 mirror.setId(id); 192 return taskQueue.contains(mirror); 193 } 194 195 @Override 196 public boolean hasPendingTaskWithCustomKey(String key) { 197 AsyncTask task = AsyncTaskController.instance().forUniqueKey("customKey", key); 198 if (task == null) { 199 return false; 200 } 201 if ((task.getExecuteAt() == 0 || task.getExecuteAt() > DateUtils.mils()) && task.getCompletedAt() < 1) { 202 return true; 203 } 204 return false; 205 } 206 207 @Override 208 public boolean hasTaskWithCustomKey(String key) { 209 AsyncTask task = AsyncTaskController.instance().forUniqueKey("customKey", key); 210 if (task == null) { 211 return false; 212 } 213 return true; 214 } 215 216 217 public PriorityBlockingQueue<AsyncTask> getTaskQueue() { 218 return taskQueue; 219 } 220 221 public void onLoadTaskOnBoot(AsyncTask task) { 222 if (!StringUtils.isEmpty(task.getId().toString()) && task.getCompletedAt() == 0 && task.getLockedAt() == 0) { 223 taskQueue.add(task); 224 } 225 } 226 227 @Override 228 public int getPendingTaskCount() { 229 return taskQueue.size(); 230 } 231 232}