001/*
002 * Stallion Core: A Modern Web Framework
003 *
004 * Copyright (C) 2015 - 2016 Stallion Software LLC.
005 *
006 * This program is free software: you can redistribute it and/or modify it under the terms of the
007 * GNU General Public License as published by the Free Software Foundation, either version 2 of
008 * the License, or (at your option) any later version. This program is distributed in the hope that
009 * it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
010 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public
011 * License for more details. You should have received a copy of the GNU General Public License
012 * along with this program.  If not, see <http://www.gnu.org/licenses/gpl-2.0.html>.
013 *
014 *
015 *
016 */
017
018package io.stallion.asyncTasks;
019
020import io.stallion.Context;
021import io.stallion.exceptions.AppException;
022import io.stallion.services.Log;
023import io.stallion.utils.DateUtils;
024import org.apache.commons.lang3.StringUtils;
025
026import java.util.HashSet;
027import java.util.concurrent.PriorityBlockingQueue;
028import java.util.concurrent.atomic.AtomicLong;
029
030import static io.stallion.utils.Literals.*;
031
032
033public class AsyncFileCoordinator extends AsyncCoordinator {
034
035    private AtomicLong counter = new AtomicLong(1);
036    private PriorityBlockingQueue<AsyncTask> taskQueue = new PriorityBlockingQueue<>();
037    private HashSet<Object> seenTaskIds = new HashSet<>();
038
039
040
041
042    @Override
043    public AsyncTask enqueue(AsyncTaskHandler handler) {
044        return enqueue(handler, null, 0);
045    }
046
047    @Override
048    public AsyncTask enqueue(AsyncTaskHandler handler, String customKey, long executeAt) {
049        AsyncTask task = new AsyncTask(handler, customKey, executeAt);
050        enqueue(task);
051        return task;
052    }
053
054    @Override
055    public void enqueue(AsyncTask task) {
056        if (StringUtils.isEmpty(task.getHandlerName())) {
057            throw new AppException("A task was enqueued, but getHandlerName() was blank");
058        }
059
060        //TODO: If the task handler is not registered, then add it
061        //TODO: If it does not exist, raise a ConfigException
062
063        if (task.getOriginallyScheduledFor() > 0) {
064            task.setExecuteAt(task.getOriginallyScheduledFor());
065        } else if (!empty(task.getExecuteAt()) && empty(task.getOriginallyScheduledFor())) {
066            task.setOriginallyScheduledFor(task.getExecuteAt());
067        }
068        if (!StringUtils.isEmpty(task.getCustomKey())) {
069            try {
070                AsyncTask existing = AsyncTaskController.instance().forUniqueKey("customKey", task.getCustomKey());
071                if (existing != null && existing.getCompletedAt() > 0) {
072                    Log.info("Existing task already ran with customKey={0}", task.getCustomKey());
073                    return;
074                } else if (existing != null) {
075                    existing.setDataJson(task.getDataJson());
076                    Boolean executeAtChanged = false;
077                    if (task.getExecuteAt() > 0 && existing.getTryCount() == 0 && existing.getExecuteAt() != task.getExecuteAt()) {
078                        existing.setExecuteAt(task.getExecuteAt());
079                        existing.setOriginallyScheduledFor(task.getOriginallyScheduledFor());
080                        executeAtChanged = true;
081                    }
082                    existing.setHandlerName(task.getHandlerName());
083                    updateTask(existing, executeAtChanged);
084                    Log.info("Updating existing task with customKey={0}", task.getCustomKey());
085                    return;
086                }
087            } catch (Exception e) {
088                throw new RuntimeException(e);
089            }
090        }
091        if (task.getExecuteAt() == 0) {
092            // Set to the past, so will execute immediately even if picked up by another server with a slower clock
093            task.setExecuteAt(DateUtils.mils() - 15000);
094        }
095        String[] parts = task.getHandlerName().split("\\.");
096        String simpleName = parts[parts.length-1];
097        Long i = counter.getAndIncrement();
098        if (task.getId() == null || StringUtils.isEmpty(task.getId().toString())) {
099            String keyPart = empty(task.getCustomKey()) ? "" : "-" + task.getCustomKey();
100            task.setId(Context.dal().getTickets().nextId());
101        }
102        saveNewTask(task);
103        if (AsyncCoordinator.instance() != null && AsyncCoordinator.instance().isSynchronousMode()) {
104            AsyncCoordinator.instance().executeNext();
105        }
106    }
107
108    @Override
109    public void updateTask(AsyncTask task, boolean executeAtChanged) {
110        AsyncTaskController.instance().save(task);
111        if (executeAtChanged) {
112            // Need to remove and add back in order to re-sort
113            // This could get expensive if the Queue is big
114            taskQueue.remove(task);
115            taskQueue.add(task);
116        }
117    }
118
119    @Override
120    public void saveNewTask(AsyncTask task) {
121        AsyncTaskController.instance().save(task);
122        Log.info("Adding task to the queue: id={0} handler={1} customKey={2}", task.getId(), task.getHandlerName(), task.getCustomKey());
123        taskQueue.add(task);
124
125    }
126
127    @Override
128    public AsyncTask findAndLockNextTask(Long now) {
129        now = or(now, DateUtils.mils());
130        if (getTaskQueue().size() == 0) {
131            return null;
132        }
133        AsyncTask task = getTaskQueue().poll();
134        if (task == null) {
135            return null;
136        }
137        if (task.getExecuteAt() > now) {
138            getTaskQueue().put(task);
139            return null;
140        }
141        if (task.getLockedAt() > 0) {
142            // Locked by someone else, carry on
143            Log.warn("Task is already locked {0} {1} currentThread={2} lockUid={3}", task.getId(), task.getLockedAt(), Thread.currentThread().getId(), task.getLockUuid());
144            return null;
145        }
146
147
148        Log.finer("Queue size is {0}", getTaskQueue().size());
149        boolean locked = lockTaskForExecution(task);
150        if (!locked) {
151            Log.warn("Unable to lock task! {0}", task.getId());
152            return null;
153        }
154        return task;
155    }
156
157
158    protected boolean lockTaskForExecution(AsyncTask task) {
159        if (seenTaskIds.contains(task.getId())) {
160            Log.warn("Trying to lock a task with an id that has already been seen! {0}", task.getId());
161            return false;
162        }
163        seenTaskIds.add(task.getId());
164        return getTaskPersister().lockForProcessing(task);
165    }
166
167    @Override
168    public boolean markCompleted(AsyncTask task) {
169        return getTaskPersister().markComplete(task);
170    }
171
172    @Override
173    public boolean markFailed(AsyncTask task, Throwable throwable) {
174        if (seenTaskIds.contains(task.getId())) {
175            seenTaskIds.remove(task.getId());
176        }
177        return getTaskPersister().markFailed(task, throwable);
178    }
179
180    @Override
181    public AsyncTaskPersister getTaskPersister() {
182        return (AsyncTaskPersister) AsyncTaskController.instance().getPersister();
183    }
184
185    public boolean hasSeenTask(String id) {
186        return seenTaskIds.contains(id);
187    }
188
189    public boolean hasTaskWithId(Long id) {
190        AsyncTask mirror = new AsyncTask();
191        mirror.setId(id);
192        return taskQueue.contains(mirror);
193    }
194
195    @Override
196    public boolean hasPendingTaskWithCustomKey(String key) {
197        AsyncTask task = AsyncTaskController.instance().forUniqueKey("customKey", key);
198        if (task == null) {
199            return false;
200        }
201        if ((task.getExecuteAt() == 0 || task.getExecuteAt() > DateUtils.mils()) && task.getCompletedAt() < 1) {
202            return true;
203        }
204        return false;
205    }
206
207    @Override
208    public boolean hasTaskWithCustomKey(String key) {
209        AsyncTask task = AsyncTaskController.instance().forUniqueKey("customKey", key);
210        if (task == null) {
211            return false;
212        }
213        return true;
214    }
215
216
217    public PriorityBlockingQueue<AsyncTask> getTaskQueue() {
218        return taskQueue;
219    }
220
221    public void onLoadTaskOnBoot(AsyncTask task) {
222        if (!StringUtils.isEmpty(task.getId().toString()) && task.getCompletedAt() == 0 && task.getLockedAt() == 0) {
223            taskQueue.add(task);
224        }
225    }
226
227    @Override
228    public int getPendingTaskCount() {
229        return taskQueue.size();
230    }
231
232}