001/* 002 * Trident - A Multithreaded Server Alternative 003 * Copyright 2014 The TridentSDK Team 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package net.tridentsdk.server.concurrent; 019 020import com.google.common.collect.ForwardingCollection; 021import com.google.common.collect.ImmutableList; 022import net.tridentsdk.concurrent.*; 023import net.tridentsdk.plugin.Plugin; 024 025import javax.annotation.concurrent.ThreadSafe; 026import java.util.Collection; 027import java.util.Iterator; 028import java.util.Queue; 029import java.util.concurrent.Callable; 030import java.util.concurrent.ConcurrentLinkedQueue; 031import java.util.concurrent.Future; 032 033/** 034 * TridentTaskScheduler is a scheduling utility that is used to reflect ScheduledTasks at a given offset of the current 035 * epoch of the server 036 * 037 * <p>The scheduler is designed to stage-heavy/run-light philosophy: most overhead in the 038 * scheduler is to the run staging, which adds the ScheduledTask to the queue, and constructs the wrapper which assigns 039 * the ScheduledTask executor and constructs the logic runnables. In contrast, running the wrapper would perform the 040 * pre-constructed logic and mark the ScheduledTask then move on. This ensures that the ScheduledTask will be delayed 041 * preferable when it is scheduled, instead of when it will be planned to run.</p> 042 * 043 * <p>Logic of ScheduledTask types: 044 * <ul> 045 * <li>Run - Call as soon as ticked, then cancelled</li> 046 * <li>Later - an AtomicLong is incremented each tick when the long reaches the delay, the ScheduledTask is 047 * called and cancelled</li> 048 * <li>Repeat - an AtomicLong is incremented each tick, when the long reaches the interval, the ScheduledTask is 049 * called, then the long is set to 0 and continues.</li> 050 * </ul> 051 * 052 * The difference between sync and async ScheduledTasks is sync runs on the plugin thread 053 * that is from the plugin scheduling the ScheduledTask. This is why a plugin object is required for ScheduledTask 054 * scheduling. Async runs on one of the other 2 ScheduledTask execution concurrent (because there are 3 concurrent in the 055 * scheduler).</p> 056 * 057 * <p>The benchmarks and testing units for the TridentTaskScheduler can be found at: 058 * http://git.io/nifjcg.</p> 059 * 060 * <p>Insertion logic places the ScheduledTask wrapped by the implementation of {@link 061 * net.tridentsdk.concurrent.ScheduledTask} to perform the run logic and scheduling decisions plus automatic 062 * ScheduledTask cancellation. Then, the overriden runnable with the ScheduledTask to be run is {@link 063 * net.tridentsdk.concurrent.ScheduledRunnable#markSchedule(net.tridentsdk.concurrent.ScheduledTask)}ed to indicate the 064 * ScheduledTask delegate is available.</p> 065 * 066 * <p>Thread safety is ensured a single iteration thread, the tick thread. Tasks added first put in the task list, 067 * then the task is marked. The execution has a higher priority over the access to the task scheduling period. Also, 068 * most tasks will be allowed to complete before any change is needed. Task execution occurs in a single thread, 069 * the tick method adds to an executor which does not share the state of the task implementation.</p> 070 * 071 * <p>The scheduler is high performance due to lock-free execution. The internal task list is a 072 * {@link java.util.concurrent.ConcurrentLinkedQueue}, iterated in the tick method which schedules a runnable assigned 073 * to the task during construction. The most overhead occurs when the runnable is scheduled, and when the logic for 074 * the scheduling method is decided during the task wrapper's construction.</p> 075 * 076 * @author The TridentSDK Team 077 */ 078@ThreadSafe 079public class TridentTaskScheduler extends ForwardingCollection<ScheduledTask> implements Scheduler { 080 private final Queue<ScheduledTaskImpl> taskList = new ConcurrentLinkedQueue<>(); 081 private final SelectableThreadPool taskExecutor = ThreadsHandler.configure("Scheduler"); 082 083 private TridentTaskScheduler() { 084 } 085 086 /** 087 * Creates a new scheduler 088 * 089 * @return the new scheduler 090 */ 091 public static TridentTaskScheduler create() { 092 return new TridentTaskScheduler(); 093 } 094 095 public void tick() { 096 Iterator<ScheduledTaskImpl> iterator = taskList.iterator(); 097 for (; iterator.hasNext(); ) { 098 iterator.next().run(); 099 } 100 } 101 102 private ScheduledTaskImpl doAdd(ScheduledTaskImpl wrap) { 103 // Does not necessarily need to be atomic, as long as changes are visible 104 // taskList is thread-safe 105 // markSchedule sets volatile field 106 while (true) { 107 boolean added = taskList.add(wrap); 108 if (added) { 109 wrap.runnable().markSchedule(wrap); 110 return wrap; 111 } 112 } 113 } 114 115 public void shutdown() { 116 taskList.clear(); 117 taskExecutor.shutdown(); 118 } 119 120 @Override 121 public ScheduledTask asyncRun(Plugin plugin, ScheduledRunnable runnable) { 122 return this.doAdd(new ScheduledTaskImpl(plugin, TaskType.ASYNC_RUN, runnable, -1)); 123 } 124 125 @Override 126 public ScheduledTask syncRun(Plugin plugin, ScheduledRunnable runnable) { 127 return this.doAdd(new ScheduledTaskImpl(plugin, TaskType.SYNC_RUN, runnable, -1)); 128 } 129 130 @Override 131 public ScheduledTask asyncLater(Plugin plugin, ScheduledRunnable runnable, long delay) { 132 return this.doAdd(new ScheduledTaskImpl(plugin, TaskType.ASYNC_LATER, runnable, delay)); 133 } 134 135 @Override 136 public ScheduledTask syncLater(Plugin plugin, ScheduledRunnable runnable, long delay) { 137 return this.doAdd(new ScheduledTaskImpl(plugin, TaskType.SYNC_LATER, runnable, delay)); 138 } 139 140 @Override 141 public ScheduledTask asyncRepeat(final Plugin plugin, final ScheduledRunnable runnable, long delay, 142 final long initialInterval) { 143 // Schedule repeating ScheduledTask later 144 return this.asyncLater(plugin, new ScheduledRunnable() { 145 @Override 146 public void run() { 147 doAdd(new ScheduledTaskImpl(plugin, TaskType.ASYNC_REPEAT, runnable, initialInterval)); 148 } 149 }, delay); 150 } 151 152 @Override 153 public ScheduledTask syncRepeat(final Plugin plugin, final ScheduledRunnable runnable, long delay, 154 final long initialInterval) { 155 // Schedule repeating ScheduledTask later 156 return this.syncLater(plugin, new ScheduledRunnable() { 157 @Override 158 public void run() { 159 doAdd(new ScheduledTaskImpl(plugin, TaskType.SYNC_REPEAT, runnable, initialInterval)); 160 } 161 }, delay); 162 } 163 164 @Override 165 protected Collection<ScheduledTask> delegate() { 166 return ImmutableList.copyOf(taskList); 167 } 168 169 private class ScheduledTaskImpl implements ScheduledTask { 170 private final Plugin plugin; 171 private final TaskType type; 172 private final ScheduledRunnable runnable; 173 174 private final SelectableThread executor; 175 private final Runnable runner; 176 177 private volatile long interval; 178 private long run = 0L; 179 180 public ScheduledTaskImpl(Plugin plugin, TaskType type, final ScheduledRunnable runnable, long step) { 181 this.plugin = plugin; 182 this.type = type; 183 this.runnable = runnable; 184 this.interval = step; 185 186 if (!type.name().contains("REPEAT")) { 187 this.runner = () -> { 188 runnable.beforeRun(); 189 runnable.run(); 190 runnable.afterAsyncRun(); 191 cancel(); 192 }; 193 } else { 194 this.runner = () -> { 195 runnable.beforeRun(); 196 runnable.run(); 197 runnable.afterAsyncRun(); 198 }; 199 } 200 201 if (type.name().contains("ASYNC")) { 202 this.executor = taskExecutor.selectCore(); 203 } else { 204 this.executor = new SelectableThread() { 205 @Override public void execute(Runnable task) { TickSync.sync(task); } 206 @Override public <V> Future<V> submitTask(Callable<V> task) { return null; } 207 @Override public void interrupt() {} 208 @Override public Thread asThread() { return null; } 209 }; 210 } 211 } 212 213 @Override 214 public long interval() { 215 return this.interval; 216 } 217 218 @Override 219 public void setInterval(long interval) { 220 this.interval = interval; 221 } 222 223 @Override 224 public TaskType type() { 225 return this.type; 226 } 227 228 @Override 229 public ScheduledRunnable runnable() { 230 return this.runnable; 231 } 232 233 @Override 234 public Plugin owner() { 235 return this.plugin; 236 } 237 238 @Override 239 public void cancel() { 240 taskList.remove(this); 241 } 242 243 @Override 244 public void run() { 245 switch (type) { 246 case ASYNC_RUN: 247 case SYNC_RUN: 248 this.executor.execute(this.runner); 249 break; 250 251 case ASYNC_LATER: 252 case SYNC_LATER: 253 // May be over if the interval set lower 254 if (++run >= interval) 255 this.executor.execute(this.runner); 256 break; 257 258 case ASYNC_REPEAT: 259 case SYNC_REPEAT: 260 // May be over if the interval set lower 261 if (++run >= interval) { 262 this.executor.execute(this.runner); 263 run = 0; 264 } 265 break; 266 default: 267 throw new IllegalStateException(); 268 } 269 } 270 } 271}