001/* 002 * Stallion Core: A Modern Web Framework 003 * 004 * Copyright (C) 2015 - 2016 Stallion Software LLC. 005 * 006 * This program is free software: you can redistribute it and/or modify it under the terms of the 007 * GNU General Public License as published by the Free Software Foundation, either version 2 of 008 * the License, or (at your option) any later version. This program is distributed in the hope that 009 * it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of 010 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public 011 * License for more details. You should have received a copy of the GNU General Public License 012 * along with this program. If not, see <http://www.gnu.org/licenses/gpl-2.0.html>. 013 * 014 * 015 * 016 */ 017 018package io.stallion.asyncTasks; 019 020import io.stallion.dataAccess.DataAccessRegistry; 021import io.stallion.dataAccess.db.DB; 022import io.stallion.exceptions.NotFoundException; 023import io.stallion.exceptions.UsageException; 024import io.stallion.jobs.JobCoordinator; 025import io.stallion.jobs.JobDefinition; 026import io.stallion.jobs.Schedule; 027import io.stallion.plugins.javascript.JsAsyncTaskHandler; 028import io.stallion.services.Log; 029import org.apache.commons.lang3.concurrent.BasicThreadFactory; 030 031import static io.stallion.utils.Literals.*; 032 033import java.util.ArrayList; 034import java.util.List; 035import java.util.concurrent.ExecutorService; 036import java.util.concurrent.Executors; 037import java.util.concurrent.TimeUnit; 038 039/** 040 * The AsyncCoordinator handles the running of asychronous tasks. The coordinator 041 * continuously polls for new tasks, and dispatches them to an executable pool. If 042 * the task fails, it is marked for a later retry. 043 * 044 */ 045public abstract class AsyncCoordinator extends Thread { 046 047 private static AsyncCoordinator INSTANCE; 048 049 ExecutorService pool; 050 List<AsyncTaskExecuteRunnable> threads; 051 private boolean triggerShutDown = false; 052 private boolean synchronousMode = false; 053 private List<ClassLoader> extraClassLoaders = list(); 054 055 056 public static AsyncCoordinator instance() { 057 return INSTANCE; 058 } 059 060 061 062 public static void initAndStart() { 063 init(); 064 startup(); 065 } 066 067 /** 068 * Load the coordinator class so that tasks and handlers can be registered, but 069 * don't actually execute any tasks. 070 */ 071 public static void init() { 072 if (INSTANCE != null) { 073 throw new UsageException("You cannot init the AsyncCoordinator twice!"); 074 } 075 if (DB.available()) { 076 INSTANCE = new AsyncDbCoordinator(); 077 AsyncTaskController.registerDbBased(); 078 } else { 079 INSTANCE = new AsyncFileCoordinator(); 080 AsyncTaskController.registerFileBased(); 081 } 082 INSTANCE.setName("stallion-async-coordinator-thread"); 083 } 084 085 public static void initEphemeralSynchronousForTests() { 086 if (INSTANCE != null) { 087 throw new RuntimeException("You cannot start the AsyncCoordinator twice!"); 088 } 089 if (AsyncTaskController.instance() == null) { 090 AsyncTaskController.registerEphemeral(); 091 } 092 AsyncCoordinator thread; 093 094 thread = new AsyncFileCoordinator(); 095 096 thread.synchronousMode = true; 097 INSTANCE = thread; 098 } 099 100 public static void initForTests() { 101 102 if (INSTANCE != null) { 103 throw new RuntimeException("You cannot start the AsyncCoordinator twice!"); 104 } 105 if (AsyncTaskController.instance() == null) { 106 AsyncTaskController.registerEphemeral(); 107 } 108 AsyncCoordinator thread = new AsyncFileCoordinator(); 109 thread.synchronousMode = true; 110 INSTANCE = thread; 111 112 } 113 114 /** 115 * Actually start the loop to poll for tasks and execute them. 116 */ 117 public static void startup() { 118 119 if (INSTANCE.synchronousMode) { 120 return; 121 } 122 if (INSTANCE.isAlive()) { 123 throw new RuntimeException("You cannot start the AsyncCoordinator twice!"); 124 } 125 Log.fine("Starting async coordinator."); 126 127 128 INSTANCE.start(); 129 130 // Register the cleanup job 131 JobCoordinator.instance().registerJob( 132 new JobDefinition() 133 .setJobClass(CleanupOldTasksJob.class) 134 .setSchedule(Schedule.daily()) 135 .setAlertThresholdMinutes(30 * 60) 136 .setName("cleanup-completed-tasks") 137 ); 138 139 } 140 141 142 143 144 protected AsyncCoordinator() { 145 threads = new ArrayList<>(); 146 int poolSize = 4; 147 BasicThreadFactory factory = new BasicThreadFactory.Builder() 148 .namingPattern("stallion-async-task-runnable-%d") 149 .build(); 150 // Create an executor service for single-threaded execution 151 pool = Executors.newFixedThreadPool(poolSize, factory); 152 153 } 154 155 156 157 public void run() { 158 while (!triggerShutDown) { 159 boolean taskExecuted = false; 160 try { 161 taskExecuted = executeNext(mils()); 162 } catch (Exception e) { 163 Log.exception(e, "Error in main sync loop"); 164 } 165 if (!taskExecuted) { 166 try { 167 Thread.sleep(1000); 168 } catch(InterruptedException e) { 169 170 } 171 } 172 } 173 Log.finer("mark all async threads for shutdown"); 174 for (AsyncTaskExecuteRunnable thread: threads) { 175 thread.setTriggerShutdown(true); 176 } 177 Log.finer("shutting down the pool"); 178 pool.shutdownNow(); 179 try { 180 while (!pool.awaitTermination(100, TimeUnit.MILLISECONDS)) { 181 Log.finer("wating for all async threads to terminate"); 182 } 183 } catch (InterruptedException e) { 184 e.printStackTrace(); 185 } 186 } 187 188 /** 189 * Execute the text in the queue, if any exists. 190 * 191 * @return 192 */ 193 public boolean executeNext() { 194 return executeNext(mils()); 195 } 196 197 /** 198 * Execute the next task, passing in the current time. For purposes of running 199 * tests, you can pass in any time you want. 200 * 201 * @param now - milliseconds since the epoch. 202 * @return 203 */ 204 public boolean executeNext(Long now) { 205 AsyncTask task = findAndLockNextTask(now); 206 if (task == null) { 207 return false; 208 } 209 AsyncTaskExecuteRunnable runnable = new AsyncTaskExecuteRunnable(task); 210 211 if (isSynchronousMode()) { 212 runnable.run(true); 213 } else { 214 pool.submit(runnable); 215 } 216 return true; 217 } 218 219 220 /** 221 * Register a handler that will run tasks of a given name. 222 * 223 * @param name 224 * @param cls 225 */ 226 public void registerHandler(String name, Class cls) { 227 AsyncTaskExecuteRunnable.registerClass(name, cls); 228 } 229 230 /** 231 * Register a handler that will run tasks of a given name. 232 * 233 * @param handler 234 */ 235 public void registerHandler(Class<JsAsyncTaskHandler> handler) { 236 JsAsyncTaskHandler instance = null; 237 try { 238 instance = handler.newInstance(); 239 } catch (InstantiationException e) { 240 throw new RuntimeException(e); 241 } catch (IllegalAccessException e) { 242 throw new RuntimeException(e); 243 } 244 AsyncTaskExecuteRunnable.registerClass(instance.getHandlerClassName(), handler); 245 } 246 247 /** 248 * Find the next task that is unlocked and ready for execution, lock it, and return it. 249 * 250 * @param now 251 * @return 252 */ 253 protected abstract AsyncTask findAndLockNextTask(Long now); 254 255 /** 256 * Save a new task to the data store 257 * 258 * @param task 259 */ 260 protected abstract void saveNewTask(AsyncTask task); 261 262 /** 263 * Update a task with new information. 264 * 265 * @param task 266 * @param executeAtChanged 267 */ 268 public abstract void updateTask(AsyncTask task, boolean executeAtChanged); 269 270 /** 271 * Mark the task as completed. 272 * 273 * @param task 274 * @return 275 */ 276 public abstract boolean markCompleted(AsyncTask task); 277 278 279 /** 280 * Directly, synchronously execute the task with the given ID, regardless 281 * of whether it has errored out or is scheduled for the future. 282 * 283 * @param taskId 284 * @param force -- if true, will run the job even if it is locked 285 */ 286 public void runTaskForId(Long taskId, boolean force) { 287 AsyncTask task = AsyncTaskController.instance().forId(taskId); 288 if (task == null) { 289 throw new NotFoundException("Task not found for id " + taskId); 290 } 291 if (task.getLockedAt() > 0) { 292 Log.warn("Task is already locked {0} {1} currentThread={2} lockUid={3}", task.getId(), task.getLockedAt(), Thread.currentThread().getId(), task.getLockUuid()); 293 if (!force) { 294 return; 295 } 296 } 297 boolean locked = getTaskPersister().lockForProcessing(task); 298 if (!locked) { 299 Log.warn("Unable to lock task! {0}", task.getId()); 300 if (!force) { 301 return; 302 } 303 } 304 AsyncTaskExecuteRunnable runnable = new AsyncTaskExecuteRunnable(task); 305 runnable.run(true); 306 } 307 308 309 /** 310 * Mark the task as failed. 311 * 312 * @param task 313 * @param throwable 314 * @return 315 */ 316 public abstract boolean markFailed(AsyncTask task, Throwable throwable); 317 318 /** 319 * Enqueue a new task runner; a task object will automatically be created for 320 * this handler. 321 * 322 * @param handler 323 * @return 324 */ 325 public abstract AsyncTask enqueue(AsyncTaskHandler handler); 326 327 /** 328 * Enque a new task handler. CustomKey should be a globally unique key. It can be later 329 * used to update the task, or to prevent double enqueuing. For instance, if you were 330 * writing a calendar application, you might create a task to notify people 1 hour 331 * before each event. You could create a customKey with the user and event id in it. 332 * Then if the user changes the event time, you can use the customKey to find the task, 333 * and update it with a new executeAt time. 334 * 335 * 336 * @param handler - the task handler that will be executed. 337 * @param customKey - a user generated unique key that allows you to prevent dupes or later update the task 338 * @param executeAt - when you want the task to execute, in epoch milliseconds 339 * @return 340 */ 341 public abstract AsyncTask enqueue(AsyncTaskHandler handler, String customKey, long executeAt); 342 343 /** 344 * Enqueue a task object, normally you enqueue using task handler, which will then 345 * call this method. 346 * 347 * @param task 348 */ 349 public abstract void enqueue(AsyncTask task); 350 351 /** 352 * Called when a task is loaded from the data store during the boot phase. 353 * 354 * @param task 355 */ 356 public abstract void onLoadTaskOnBoot(AsyncTask task); 357 358 /** 359 * Get the number of tasks waiting to be executed. 360 * @return 361 */ 362 public abstract int getPendingTaskCount(); 363 364 public abstract AsyncTaskPersister getTaskPersister(); 365 366 /** 367 * Check to see if the task with the given id exists in the queue 368 * @param taskId 369 * @return 370 */ 371 public abstract boolean hasTaskWithId(Long taskId); 372 373 /** 374 * Has an unexecuted task with the given custom key 375 * @param key 376 * @return 377 */ 378 public abstract boolean hasPendingTaskWithCustomKey(String key); 379 380 /** 381 * Has any task, pending or already run, with the given custom key 382 * @param key 383 * @return 384 */ 385 public abstract boolean hasTaskWithCustomKey(String key); 386 387 public static void shutDownForTests() { 388 for(AsyncTaskExecuteRunnable runnable: INSTANCE.threads) { 389 runnable.setTriggerShutdown(true); 390 } 391 DataAccessRegistry.instance().deregister("async_tasks"); 392 INSTANCE.pool.shutdown(); 393 INSTANCE = null; 394 } 395 396 /** 397 * Shutdown, while waiting for all task handlers to finish executing. 398 */ 399 public static void gracefulShutdown() { 400 if (INSTANCE == null) { 401 return; 402 } 403 INSTANCE.triggerShutDown = true; 404 for(AsyncTaskExecuteRunnable runnable: INSTANCE.threads) { 405 runnable.setTriggerShutdown(true); 406 } 407 INSTANCE.pool.shutdown(); 408 INSTANCE.interrupt(); 409 410 int maxMinutesToWait = 5; 411 int xMax = maxMinutesToWait*60*10; 412 Log.info("Waiting for all async stuff to terminate."); 413 for (int x=0; x<(xMax+10);x++) { 414 if (INSTANCE.pool.isTerminated()) { 415 break; 416 } 417 if (x % 5000 == 0) { 418 Log.info("Waiting for all async tasks to finish before shutting down."); 419 } 420 if (x > xMax) { 421 Log.warn("Tasks still running waiting {0} minutes! Doing hard shutdown.", maxMinutesToWait); 422 break; 423 } 424 try { 425 Thread.sleep(100); 426 } catch(InterruptedException e){ 427 428 } 429 430 } 431 INSTANCE = null; 432 } 433 434 435 /** 436 * true if is in test mode, where we run tests synchronously rather than in a background thread 437 * @return 438 */ 439 public boolean isSynchronousMode() { 440 return synchronousMode; 441 } 442 443 444 public void registerClassLoader(ClassLoader loader) { 445 getExtraClassLoaders().add(loader); 446 } 447 448 public List<ClassLoader> getExtraClassLoaders() { 449 return extraClassLoaders; 450 } 451 452 453}