001/*
002 * Stallion Core: A Modern Web Framework
003 *
004 * Copyright (C) 2015 - 2016 Stallion Software LLC.
005 *
006 * This program is free software: you can redistribute it and/or modify it under the terms of the
007 * GNU General Public License as published by the Free Software Foundation, either version 2 of
008 * the License, or (at your option) any later version. This program is distributed in the hope that
009 * it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
010 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public
011 * License for more details. You should have received a copy of the GNU General Public License
012 * along with this program.  If not, see <http://www.gnu.org/licenses/gpl-2.0.html>.
013 *
014 *
015 *
016 */
017
018package io.stallion.asyncTasks;
019
020import io.stallion.dataAccess.DataAccessRegistry;
021import io.stallion.dataAccess.db.DB;
022import io.stallion.exceptions.NotFoundException;
023import io.stallion.exceptions.UsageException;
024import io.stallion.jobs.JobCoordinator;
025import io.stallion.jobs.JobDefinition;
026import io.stallion.jobs.Schedule;
027import io.stallion.plugins.javascript.JsAsyncTaskHandler;
028import io.stallion.services.Log;
029import org.apache.commons.lang3.concurrent.BasicThreadFactory;
030
031import static io.stallion.utils.Literals.*;
032
033import java.util.ArrayList;
034import java.util.List;
035import java.util.concurrent.ExecutorService;
036import java.util.concurrent.Executors;
037import java.util.concurrent.TimeUnit;
038
039/**
040 * The AsyncCoordinator handles the running of asychronous tasks. The coordinator
041 * continuously polls for new tasks, and dispatches them to an executable pool. If
042 * the task fails, it is marked for a later retry.
043 *
044 */
045public abstract class AsyncCoordinator extends Thread {
046
047    private static AsyncCoordinator INSTANCE;
048
049    ExecutorService pool;
050    List<AsyncTaskExecuteRunnable> threads;
051    private boolean triggerShutDown = false;
052    private boolean synchronousMode = false;
053    private List<ClassLoader> extraClassLoaders = list();
054
055
056    public static AsyncCoordinator instance() {
057        return INSTANCE;
058    }
059
060
061
062    public static void initAndStart() {
063        init();
064        startup();
065    }
066
067    /**
068     * Load the coordinator class so that tasks and handlers can be registered, but
069     * don't actually execute any tasks.
070     */
071    public static void init() {
072        if (INSTANCE != null) {
073            throw new UsageException("You cannot init the AsyncCoordinator twice!");
074        }
075        if (DB.available()) {
076            INSTANCE = new AsyncDbCoordinator();
077            AsyncTaskController.registerDbBased();
078        } else {
079            INSTANCE = new AsyncFileCoordinator();
080            AsyncTaskController.registerFileBased();
081        }
082        INSTANCE.setName("stallion-async-coordinator-thread");
083    }
084
085    public static void initEphemeralSynchronousForTests() {
086        if (INSTANCE != null) {
087            throw new RuntimeException("You cannot start the AsyncCoordinator twice!");
088        }
089        if (AsyncTaskController.instance() == null) {
090            AsyncTaskController.registerEphemeral();
091        }
092        AsyncCoordinator thread;
093
094        thread = new AsyncFileCoordinator();
095
096        thread.synchronousMode = true;
097        INSTANCE = thread;
098    }
099
100    public static void initForTests() {
101
102        if (INSTANCE != null) {
103            throw new RuntimeException("You cannot start the AsyncCoordinator twice!");
104        }
105        if (AsyncTaskController.instance() == null) {
106            AsyncTaskController.registerEphemeral();
107        }
108        AsyncCoordinator thread = new AsyncFileCoordinator();
109        thread.synchronousMode = true;
110        INSTANCE = thread;
111
112    }
113
114    /**
115     * Actually start the loop to poll for tasks and execute them.
116     */
117    public static void startup() {
118
119        if (INSTANCE.synchronousMode) {
120            return;
121        }
122        if (INSTANCE.isAlive()) {
123            throw new RuntimeException("You cannot start the AsyncCoordinator twice!");
124        }
125        Log.fine("Starting async coordinator.");
126
127
128        INSTANCE.start();
129
130        // Register the cleanup job
131        JobCoordinator.instance().registerJob(
132                new JobDefinition()
133                        .setJobClass(CleanupOldTasksJob.class)
134                .setSchedule(Schedule.daily())
135                .setAlertThresholdMinutes(30 * 60)
136                .setName("cleanup-completed-tasks")
137        );
138
139    }
140
141
142
143
144    protected AsyncCoordinator() {
145        threads = new ArrayList<>();
146        int poolSize = 4;
147        BasicThreadFactory factory = new BasicThreadFactory.Builder()
148                .namingPattern("stallion-async-task-runnable-%d")
149                .build();
150        // Create an executor service for single-threaded execution
151        pool = Executors.newFixedThreadPool(poolSize, factory);
152
153    }
154
155
156
157    public void run() {
158        while (!triggerShutDown) {
159            boolean taskExecuted = false;
160            try {
161                taskExecuted = executeNext(mils());
162            } catch (Exception e) {
163                Log.exception(e, "Error in main sync loop");
164            }
165            if (!taskExecuted) {
166                try {
167                    Thread.sleep(1000);
168                } catch(InterruptedException e) {
169
170                }
171            }
172        }
173        Log.finer("mark all async threads for shutdown");
174        for (AsyncTaskExecuteRunnable thread: threads) {
175            thread.setTriggerShutdown(true);
176        }
177        Log.finer("shutting down the pool");
178        pool.shutdownNow();
179        try {
180            while (!pool.awaitTermination(100, TimeUnit.MILLISECONDS)) {
181                Log.finer("wating for all async threads to terminate");
182            }
183        } catch (InterruptedException e) {
184            e.printStackTrace();
185        }
186    }
187
188    /**
189     * Execute the text in the queue, if any exists.
190     *
191     * @return
192     */
193    public boolean executeNext() {
194        return executeNext(mils());
195    }
196
197    /**
198     * Execute the next task, passing in the current time. For purposes of running
199     * tests, you can pass in any time you want.
200     *
201     * @param now - milliseconds since the epoch.
202     * @return
203     */
204    public boolean executeNext(Long now) {
205        AsyncTask task = findAndLockNextTask(now);
206        if (task == null) {
207            return false;
208        }
209        AsyncTaskExecuteRunnable runnable = new AsyncTaskExecuteRunnable(task);
210
211        if (isSynchronousMode()) {
212            runnable.run(true);
213        } else {
214            pool.submit(runnable);
215        }
216        return true;
217    }
218
219
220    /**
221     * Register a handler that will run tasks of a given name.
222     *
223     * @param name
224     * @param cls
225     */
226    public void registerHandler(String name, Class cls) {
227        AsyncTaskExecuteRunnable.registerClass(name, cls);
228    }
229
230    /**
231     * Register a handler that will run tasks of a given name.
232     *
233     * @param handler
234     */
235    public void registerHandler(Class<JsAsyncTaskHandler> handler) {
236        JsAsyncTaskHandler instance = null;
237        try {
238            instance = handler.newInstance();
239        } catch (InstantiationException e) {
240            throw new RuntimeException(e);
241        } catch (IllegalAccessException e) {
242            throw new RuntimeException(e);
243        }
244        AsyncTaskExecuteRunnable.registerClass(instance.getHandlerClassName(), handler);
245    }
246
247    /**
248     * Find the next task that is unlocked and ready for execution, lock it, and return it.
249     *
250     * @param now
251     * @return
252     */
253    protected abstract AsyncTask findAndLockNextTask(Long now);
254
255    /**
256     * Save a new task to the data store
257     *
258     * @param task
259     */
260    protected abstract void saveNewTask(AsyncTask task);
261
262    /**
263     * Update a task with new information.
264     *
265     * @param task
266     * @param executeAtChanged
267     */
268    public abstract void updateTask(AsyncTask task, boolean executeAtChanged);
269
270    /**
271     * Mark the task as completed.
272     *
273     * @param task
274     * @return
275     */
276    public abstract boolean markCompleted(AsyncTask task);
277
278
279    /**
280     * Directly, synchronously execute the task with the given ID, regardless
281     * of whether it has errored out or is scheduled for the future.
282     *
283     * @param taskId
284     * @param force -- if true, will run the job even if it is locked
285     */
286    public void runTaskForId(Long taskId, boolean force) {
287        AsyncTask task = AsyncTaskController.instance().forId(taskId);
288        if (task == null) {
289            throw new NotFoundException("Task not found for id " + taskId);
290        }
291        if (task.getLockedAt() > 0) {
292            Log.warn("Task is already locked {0} {1} currentThread={2} lockUid={3}", task.getId(), task.getLockedAt(), Thread.currentThread().getId(), task.getLockUuid());
293            if (!force) {
294                return;
295            }
296        }
297        boolean locked = getTaskPersister().lockForProcessing(task);
298        if (!locked) {
299            Log.warn("Unable to lock task! {0}", task.getId());
300            if (!force) {
301                return;
302            }
303        }
304        AsyncTaskExecuteRunnable runnable = new AsyncTaskExecuteRunnable(task);
305        runnable.run(true);
306    }
307
308
309    /**
310     * Mark the task as failed.
311     *
312     * @param task
313     * @param throwable
314     * @return
315     */
316    public abstract boolean markFailed(AsyncTask task, Throwable throwable);
317
318    /**
319     * Enqueue a new task runner; a task object will automatically be created for
320     * this handler.
321     *
322     * @param handler
323     * @return
324     */
325    public abstract AsyncTask enqueue(AsyncTaskHandler handler);
326
327    /**
328     * Enque a new task handler. CustomKey should be a globally unique key. It can be later
329     * used to update the task, or to prevent double enqueuing. For instance, if you were
330     * writing a calendar application, you might create a task to notify people 1 hour
331     * before each event. You could create a customKey with the user and event id in it.
332     * Then if the user changes the event time, you can use the customKey to find the task,
333     * and update it with a new executeAt time.
334     *
335     *
336     * @param handler - the task handler that will be executed.
337     * @param customKey - a user generated unique key that allows you to prevent dupes or later update the task
338     * @param executeAt - when you want the task to execute, in epoch milliseconds
339     * @return
340     */
341    public abstract AsyncTask enqueue(AsyncTaskHandler handler, String customKey, long executeAt);
342
343    /**
344     * Enqueue a task object, normally you enqueue using task handler, which will then
345     * call this method.
346     *
347     * @param task
348     */
349    public abstract void enqueue(AsyncTask task);
350
351    /**
352     * Called when a task is loaded from the data store during the boot phase.
353     *
354     * @param task
355     */
356    public abstract void onLoadTaskOnBoot(AsyncTask task);
357
358    /**
359     * Get the number of tasks waiting to be executed.
360     * @return
361     */
362    public abstract int getPendingTaskCount();
363
364    public abstract AsyncTaskPersister getTaskPersister();
365
366    /**
367     * Check to see if the task with the given id exists in the queue
368     * @param taskId
369     * @return
370     */
371    public abstract boolean hasTaskWithId(Long taskId);
372
373    /**
374     * Has an unexecuted task with the given custom key
375     * @param key
376     * @return
377     */
378    public abstract boolean hasPendingTaskWithCustomKey(String key);
379
380    /**
381     * Has any task, pending or already run, with the given custom key
382     * @param key
383     * @return
384     */
385    public abstract boolean hasTaskWithCustomKey(String key);
386
387    public static void shutDownForTests() {
388        for(AsyncTaskExecuteRunnable runnable: INSTANCE.threads) {
389            runnable.setTriggerShutdown(true);
390        }
391        DataAccessRegistry.instance().deregister("async_tasks");
392        INSTANCE.pool.shutdown();
393        INSTANCE = null;
394    }
395
396    /**
397     * Shutdown, while waiting for all task handlers to finish executing.
398     */
399    public static void gracefulShutdown() {
400        if (INSTANCE == null) {
401            return;
402        }
403        INSTANCE.triggerShutDown = true;
404        for(AsyncTaskExecuteRunnable runnable: INSTANCE.threads) {
405            runnable.setTriggerShutdown(true);
406        }
407        INSTANCE.pool.shutdown();
408        INSTANCE.interrupt();
409
410        int maxMinutesToWait = 5;
411        int xMax = maxMinutesToWait*60*10;
412        Log.info("Waiting for all async stuff to terminate.");
413        for (int x=0; x<(xMax+10);x++) {
414            if (INSTANCE.pool.isTerminated()) {
415                break;
416            }
417            if (x % 5000 == 0) {
418                Log.info("Waiting for all async tasks to finish before shutting down.");
419            }
420            if (x > xMax) {
421                Log.warn("Tasks still running waiting {0} minutes! Doing hard shutdown.", maxMinutesToWait);
422                break;
423            }
424            try {
425                Thread.sleep(100);
426            } catch(InterruptedException e){
427
428            }
429
430        }
431        INSTANCE = null;
432    }
433
434
435    /**
436     * true if is in test mode, where we run tests synchronously rather than in a background thread
437     * @return
438     */
439    public boolean isSynchronousMode() {
440        return synchronousMode;
441    }
442
443
444    public void registerClassLoader(ClassLoader loader) {
445        getExtraClassLoaders().add(loader);
446    }
447
448    public List<ClassLoader> getExtraClassLoaders() {
449        return extraClassLoaders;
450    }
451
452
453}