001/* 002 * Stallion Core: A Modern Web Framework 003 * 004 * Copyright (C) 2015 - 2016 Stallion Software LLC. 005 * 006 * This program is free software: you can redistribute it and/or modify it under the terms of the 007 * GNU General Public License as published by the Free Software Foundation, either version 2 of 008 * the License, or (at your option) any later version. This program is distributed in the hope that 009 * it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of 010 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public 011 * License for more details. You should have received a copy of the GNU General Public License 012 * along with this program. If not, see <http://www.gnu.org/licenses/gpl-2.0.html>. 013 * 014 * 015 * 016 */ 017 018package io.stallion.asyncTasks; 019 020import io.stallion.dataAccess.Model; 021import io.stallion.dataAccess.db.DB; 022import io.stallion.dataAccess.file.ItemFileChangeEventHandler; 023import io.stallion.dataAccess.file.JsonFilePersister; 024import io.stallion.fileSystem.FileSystemWatcherService; 025import io.stallion.services.Log; 026import io.stallion.utils.DateUtils; 027import org.apache.commons.io.filefilter.RegexFileFilter; 028import org.apache.commons.lang3.exception.ExceptionUtils; 029 030import java.io.File; 031import java.io.FileFilter; 032import java.time.ZonedDateTime; 033import java.util.ArrayList; 034import java.util.List; 035 036public class AsyncTaskFilePersister extends JsonFilePersister implements AsyncTaskPersister { 037 038 @Override 039 public List fetchAll() { 040 File[] folders = new File[]{ 041 new File(getBucketFolderPath()), 042 new File(getBucketFolderPath() + "/locked"), 043 new File(getBucketFolderPath() + "/pending"), 044 new File(getBucketFolderPath() + "/failed"), 045 new File(getBucketFolderPath() + "/completed"), 046 new File(getBucketFolderPath() + "/manualRetry") 047 }; 048 for(File folder: folders) { 049 if (!folder.isDirectory()) { 050 folder.mkdirs(); 051 } 052 } 053 FileFilter fileFilter = new RegexFileFilter("\\d+\\d+\\d+\\d+.*\\.json"); 054 File root = new File(getBucketFolderPath() + "/pending"); 055 File[] files = root.listFiles(fileFilter); 056 List items = new ArrayList<>(); 057 for (File file : files) { 058 items.add(fetchOne(file.getAbsolutePath())); 059 } 060 return items; 061 } 062 063 064 /** 065 * This callback occurs when the watcher finds a new file added to manual retry 066 * We load the task, delete the original file, then re-enqueue the task 067 * @param relativePath 068 */ 069 @Override 070 public void watchEventCallback(String relativePath) { 071 Log.fine("file changed: {0}", relativePath); 072 String path = getBucketFolderPath() + "/manualRetry/" + relativePath; 073 File file = new File(path); 074 // If this is a result of a delete, event we return 075 if (!file.isFile()) { 076 return; 077 } 078 AsyncTask task = (AsyncTask)fetchOne(path); 079 // Reset all the errors so we will reprocess 080 task.setLockUuid("").setTryCount(0).setLockedAt(0).setFailedAt(0).setErrorMessage(""); 081 boolean deleted = new File(path).delete(); 082 if (deleted) { 083 AsyncCoordinator.instance().enqueue(task); 084 } 085 } 086 087 /** 088 * The watcher for JsonTaskPersister only watches the manual entry folder. 089 * If a task fails too many times, then we fix the problem, we need a way to force the 090 * failed tasks to be retried. The way to do this is to manually edit the JSON, then move 091 * the file into the manualRetry folder. The watcher will pick it up, and move it into the 092 * pending queue 093 */ 094 @Override 095 public void attachWatcher() { 096 String folderToWatch = getBucketFolderPath() + "/manualRetry"; 097 if (!new File(folderToWatch).isDirectory()) { 098 new File(folderToWatch).mkdirs(); 099 } 100 FileSystemWatcherService.instance().registerWatcher( 101 new ItemFileChangeEventHandler(this) 102 .setExtension(".json") 103 .setWatchedFolder(folderToWatch) 104 .setWatchTree(false)); 105 } 106 107 108 public boolean lockForProcessing(AsyncTask task) { 109 Log.fine("Locking task for processing {0}", task.getId()); 110 //String errMessage = String.format("Failed to lock task id=%s class=%s customKey=%s", task.getId(), task.getHandlerName(), task.getCustomKey(); 111 if (task.getLockedAt() > 0) { 112 return false; 113 } 114 File file = new File(pathForPending(task)); 115 File dest = new File(pathForLocked(task)); 116 boolean succeeded = file.renameTo(dest); 117 if (!succeeded) { 118 return false; 119 } 120 task.setLockedAt(DateUtils.mils()); 121 task.setLockUuid("thread-" + Thread.currentThread().getId()); 122 persist(task); 123 return true; 124 } 125 126 public boolean markComplete(AsyncTask task) { 127 File file = new File(pathForLocked(task)); 128 File dest = new File(pathForCompleted(task)); 129 boolean succeeded = file.renameTo(dest); 130 if (!succeeded) { 131 return false; 132 } 133 task.setCompletedAt(DateUtils.mils()); 134 persist(task); 135 return true; 136 } 137 138 public boolean markFailed(AsyncTask task, Throwable e) { 139 File file = new File(fullFilePathForObj(task)); 140 task.setTryCount(task.getTryCount() + 1); 141 142 task.setErrorMessage(e.toString() + ExceptionUtils.getStackTrace(e)); 143 if (task.getTryCount() >= 5) { 144 task.setFailedAt(DateUtils.mils()); 145 } else { 146 task.setExecuteAt(DateUtils.mils() + ((2^task.getTryCount())*1000)); 147 task.setLockedAt(0); 148 task.setLockUuid(""); 149 } 150 File dest = new File(fullFilePathForObj(task)); 151 boolean succeeded = file.renameTo(dest); 152 if (!succeeded) { 153 return false; 154 } 155 persist(task); 156 return true; 157 } 158 159 @Override 160 public String fullFilePathForObj(Model model) { 161 AsyncTask task = (AsyncTask)model; 162 String path; 163 if (task.getCompletedAt() > 0) { 164 path = pathForCompleted(task); 165 } else if (task.getFailedAt() > 0) { 166 path = pathForFailed(task); 167 } else if (task.getLockedAt() > 0) { 168 path = pathForLocked(task); 169 } else { 170 path = pathForPending(task); 171 } 172 return path; 173 } 174 175 @Override 176 public void deleteOldTasks() { 177 File dir = new File(getBucketFolderPath() + "/completed"); 178 Long before = DateUtils.utcNow().minusDays(40).toInstant().toEpochMilli(); 179 for(File f: dir.listFiles()) { 180 if (!f.isFile() || f.isHidden()) { 181 continue; 182 } 183 if (f.getName().startsWith(".") || f.getName().startsWith("~") || f.getName().startsWith("#")) { 184 continue; 185 } 186 if (f.getName().endsWith(".json")) { 187 continue; 188 } 189 if (f.lastModified() < before) { 190 f.delete(); 191 } 192 } 193 194 } 195 196 @Override 197 public String relativeFilePathForObj(Model model) { 198 AsyncTask task = (AsyncTask)model; 199 String path = fullFilePathForObj(model); 200 return path.replace(getBucketFolderPath(), ""); 201 } 202 203 private String pathForCompleted(AsyncTask task) { 204 String path = getBucketFolderPath() + "/completed/"; 205 path += task.getId() + ".json"; 206 return path; 207 } 208 209 private String pathForPending(AsyncTask task) { 210 String path = getBucketFolderPath() + "/pending/"; 211 path += task.getId() + ".json"; 212 return path; 213 } 214 215 private String pathForLocked(AsyncTask task) { 216 String path = getBucketFolderPath() + "/locked/"; 217 path += task.getId() + ".json"; 218 return path; 219 } 220 221 private String pathForFailed(AsyncTask task) { 222 String path = getBucketFolderPath() + "/failed/"; 223 path += task.getId() + ".json"; 224 return path; 225 } 226 227 228}