001/*
002 * Stallion Core: A Modern Web Framework
003 *
004 * Copyright (C) 2015 - 2016 Stallion Software LLC.
005 *
006 * This program is free software: you can redistribute it and/or modify it under the terms of the
007 * GNU General Public License as published by the Free Software Foundation, either version 2 of
008 * the License, or (at your option) any later version. This program is distributed in the hope that
009 * it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
010 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public
011 * License for more details. You should have received a copy of the GNU General Public License
012 * along with this program.  If not, see <http://www.gnu.org/licenses/gpl-2.0.html>.
013 *
014 *
015 *
016 */
017
018package io.stallion.asyncTasks;
019
020import io.stallion.dataAccess.Model;
021import io.stallion.dataAccess.db.DB;
022import io.stallion.dataAccess.file.ItemFileChangeEventHandler;
023import io.stallion.dataAccess.file.JsonFilePersister;
024import io.stallion.fileSystem.FileSystemWatcherService;
025import io.stallion.services.Log;
026import io.stallion.utils.DateUtils;
027import org.apache.commons.io.filefilter.RegexFileFilter;
028import org.apache.commons.lang3.exception.ExceptionUtils;
029
030import java.io.File;
031import java.io.FileFilter;
032import java.time.ZonedDateTime;
033import java.util.ArrayList;
034import java.util.List;
035
036public class AsyncTaskFilePersister extends JsonFilePersister implements AsyncTaskPersister {
037
038    @Override
039    public List fetchAll() {
040        File[] folders = new File[]{
041                new File(getBucketFolderPath()),
042                new File(getBucketFolderPath() + "/locked"),
043                new File(getBucketFolderPath() + "/pending"),
044                new File(getBucketFolderPath() + "/failed"),
045                new File(getBucketFolderPath() + "/completed"),
046                new File(getBucketFolderPath() + "/manualRetry")
047        };
048        for(File folder: folders) {
049            if (!folder.isDirectory()) {
050                folder.mkdirs();
051            }
052        }
053        FileFilter fileFilter = new RegexFileFilter("\\d+\\d+\\d+\\d+.*\\.json");
054        File root = new File(getBucketFolderPath() + "/pending");
055        File[] files = root.listFiles(fileFilter);
056        List items = new ArrayList<>();
057        for (File file : files) {
058            items.add(fetchOne(file.getAbsolutePath()));
059        }
060        return items;
061    }
062
063
064    /**
065     * This callback occurs when the watcher finds a new file added to manual retry
066     * We load the task, delete the original file, then re-enqueue the task
067     * @param relativePath
068     */
069    @Override
070    public void watchEventCallback(String relativePath) {
071        Log.fine("file changed: {0}", relativePath);
072        String path = getBucketFolderPath() + "/manualRetry/" + relativePath;
073        File file = new File(path);
074        // If this is a result of a delete, event we return
075        if (!file.isFile()) {
076            return;
077        }
078        AsyncTask task = (AsyncTask)fetchOne(path);
079        // Reset all the errors so we will reprocess
080        task.setLockUuid("").setTryCount(0).setLockedAt(0).setFailedAt(0).setErrorMessage("");
081        boolean deleted = new File(path).delete();
082        if (deleted) {
083            AsyncCoordinator.instance().enqueue(task);
084        }
085    }
086
087    /**
088     * The watcher for JsonTaskPersister only watches the manual entry folder.
089     * If a task fails too many times, then we fix the problem, we need a way to force the
090     * failed tasks to be retried. The way to do this is to manually edit the JSON, then move
091     * the file into the manualRetry folder. The watcher will pick it up, and move it into the
092     * pending queue
093     */
094    @Override
095    public void attachWatcher()  {
096        String folderToWatch = getBucketFolderPath() + "/manualRetry";
097        if (!new File(folderToWatch).isDirectory()) {
098            new File(folderToWatch).mkdirs();
099        }
100        FileSystemWatcherService.instance().registerWatcher(
101                new ItemFileChangeEventHandler(this)
102                        .setExtension(".json")
103                        .setWatchedFolder(folderToWatch)
104                        .setWatchTree(false));
105    }
106
107
108    public boolean lockForProcessing(AsyncTask task)  {
109        Log.fine("Locking task for processing {0}", task.getId());
110        //String errMessage = String.format("Failed to lock task id=%s class=%s customKey=%s", task.getId(), task.getHandlerName(), task.getCustomKey();
111        if (task.getLockedAt() > 0) {
112            return false;
113        }
114        File file = new File(pathForPending(task));
115        File dest = new File(pathForLocked(task));
116        boolean succeeded = file.renameTo(dest);
117        if (!succeeded) {
118            return false;
119        }
120        task.setLockedAt(DateUtils.mils());
121        task.setLockUuid("thread-" + Thread.currentThread().getId());
122        persist(task);
123        return true;
124    }
125
126    public boolean markComplete(AsyncTask task) {
127        File file = new File(pathForLocked(task));
128        File dest = new File(pathForCompleted(task));
129        boolean succeeded = file.renameTo(dest);
130        if (!succeeded) {
131            return false;
132        }
133        task.setCompletedAt(DateUtils.mils());
134        persist(task);
135        return true;
136    }
137
138    public boolean markFailed(AsyncTask task, Throwable e) {
139        File file = new File(fullFilePathForObj(task));
140        task.setTryCount(task.getTryCount() + 1);
141
142        task.setErrorMessage(e.toString() + ExceptionUtils.getStackTrace(e));
143        if (task.getTryCount() >= 5) {
144            task.setFailedAt(DateUtils.mils());
145        } else {
146            task.setExecuteAt(DateUtils.mils() + ((2^task.getTryCount())*1000));
147            task.setLockedAt(0);
148            task.setLockUuid("");
149        }
150        File dest = new File(fullFilePathForObj(task));
151        boolean succeeded = file.renameTo(dest);
152        if (!succeeded) {
153            return false;
154        }
155        persist(task);
156        return true;
157    }
158
159    @Override
160    public String fullFilePathForObj(Model model) {
161        AsyncTask task = (AsyncTask)model;
162        String path;
163        if (task.getCompletedAt() > 0) {
164            path = pathForCompleted(task);
165        } else if (task.getFailedAt() > 0) {
166            path = pathForFailed(task);
167        } else if (task.getLockedAt() > 0) {
168            path = pathForLocked(task);
169        } else {
170            path = pathForPending(task);
171        }
172        return path;
173    }
174
175    @Override
176    public void deleteOldTasks() {
177        File dir = new File(getBucketFolderPath() + "/completed");
178        Long before = DateUtils.utcNow().minusDays(40).toInstant().toEpochMilli();
179        for(File f: dir.listFiles()) {
180            if (!f.isFile() || f.isHidden()) {
181                 continue;
182            }
183            if (f.getName().startsWith(".") || f.getName().startsWith("~") || f.getName().startsWith("#")) {
184                continue;
185            }
186            if (f.getName().endsWith(".json")) {
187                continue;
188            }
189            if (f.lastModified() < before) {
190                f.delete();
191            }
192        }
193
194    }
195
196    @Override
197    public String relativeFilePathForObj(Model model) {
198        AsyncTask task = (AsyncTask)model;
199        String path = fullFilePathForObj(model);
200        return path.replace(getBucketFolderPath(), "");
201    }
202
203    private String pathForCompleted(AsyncTask task) {
204        String path = getBucketFolderPath() + "/completed/";
205        path += task.getId() + ".json";
206        return path;
207    }
208
209    private String pathForPending(AsyncTask task) {
210        String path = getBucketFolderPath() + "/pending/";
211        path += task.getId() + ".json";
212        return path;
213    }
214
215    private String pathForLocked(AsyncTask task) {
216        String path = getBucketFolderPath() + "/locked/";
217        path += task.getId() + ".json";
218        return path;
219    }
220
221    private String pathForFailed(AsyncTask task) {
222        String path = getBucketFolderPath() + "/failed/";
223        path += task.getId() + ".json";
224        return path;
225    }
226
227
228}