001/*
002 * Trident - A Multithreaded Server Alternative
003 * Copyright 2014 The TridentSDK Team
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *    http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package net.tridentsdk.server.concurrent;
019
020import com.google.common.collect.ForwardingCollection;
021import com.google.common.collect.ImmutableList;
022import net.tridentsdk.concurrent.*;
023import net.tridentsdk.plugin.Plugin;
024
025import javax.annotation.concurrent.ThreadSafe;
026import java.util.Collection;
027import java.util.Iterator;
028import java.util.Queue;
029import java.util.concurrent.Callable;
030import java.util.concurrent.ConcurrentLinkedQueue;
031import java.util.concurrent.Future;
032
033/**
034 * TridentTaskScheduler is a scheduling utility that is used to reflect ScheduledTasks at a given offset of the current
035 * epoch of the server
036 *
037 * <p>The scheduler is designed to stage-heavy/run-light philosophy: most overhead in the
038 * scheduler is to the run staging, which adds the ScheduledTask to the queue, and constructs the wrapper which assigns
039 * the ScheduledTask executor and constructs the logic runnables. In contrast, running the wrapper would perform the
040 * pre-constructed logic and mark the ScheduledTask then move on. This ensures that the ScheduledTask will be delayed
041 * preferable when it is scheduled, instead of when it will be planned to run.</p>
042 *
043 * <p>Logic of ScheduledTask types:
044 * <ul>
045 * <li>Run    - Call as soon as ticked, then cancelled</li>
046 * <li>Later  - an AtomicLong is incremented each tick when the long reaches the delay, the ScheduledTask is
047 * called and cancelled</li>
048 * <li>Repeat - an AtomicLong is incremented each tick, when the long reaches the interval, the ScheduledTask is
049 * called, then the long is set to 0 and continues.</li>
050 * </ul>
051 *
052 * The difference between sync and async ScheduledTasks is sync runs on the plugin thread
053 * that is from the plugin scheduling the ScheduledTask. This is why a plugin object is required for ScheduledTask
054 * scheduling. Async runs on one of the other 2 ScheduledTask execution concurrent (because there are 3 concurrent in the
055 * scheduler).</p>
056 *
057 * <p>The benchmarks and testing units for the TridentTaskScheduler can be found at:
058 * http://git.io/nifjcg.</p>
059 *
060 * <p>Insertion logic places the ScheduledTask wrapped by the implementation of {@link
061 * net.tridentsdk.concurrent.ScheduledTask} to perform the run logic and scheduling decisions plus automatic
062 * ScheduledTask cancellation. Then, the overriden runnable with the ScheduledTask to be run is {@link
063 * net.tridentsdk.concurrent.ScheduledRunnable#markSchedule(net.tridentsdk.concurrent.ScheduledTask)}ed to indicate the
064 * ScheduledTask delegate is available.</p>
065 *
066 * <p>Thread safety is ensured a single iteration thread, the tick thread. Tasks added first put in the task list,
067 * then the task is marked. The execution has a higher priority over the access to the task scheduling period. Also,
068 * most tasks will be allowed to complete before any change is needed. Task execution occurs in a single thread,
069 * the tick method adds to an executor which does not share the state of the task implementation.</p>
070 *
071 * <p>The scheduler is high performance due to lock-free execution. The internal task list is a
072 * {@link java.util.concurrent.ConcurrentLinkedQueue}, iterated in the tick method which schedules a runnable assigned
073 * to the task during construction. The most overhead occurs when the runnable is scheduled, and when the logic for
074 * the scheduling method is decided during the task wrapper's construction.</p>
075 *
076 * @author The TridentSDK Team
077 */
078@ThreadSafe
079public class TridentTaskScheduler extends ForwardingCollection<ScheduledTask> implements Scheduler {
080    private final Queue<ScheduledTaskImpl> taskList = new ConcurrentLinkedQueue<>();
081    private final SelectableThreadPool taskExecutor = ThreadsHandler.configure("Scheduler");
082
083    private TridentTaskScheduler() {
084    }
085
086    /**
087     * Creates a new scheduler
088     *
089     * @return the new scheduler
090     */
091    public static TridentTaskScheduler create() {
092        return new TridentTaskScheduler();
093    }
094
095    public void tick() {
096        Iterator<ScheduledTaskImpl> iterator = taskList.iterator();
097        for (; iterator.hasNext(); ) {
098            iterator.next().run();
099        }
100    }
101
102    private ScheduledTaskImpl doAdd(ScheduledTaskImpl wrap) {
103        // Does not necessarily need to be atomic, as long as changes are visible
104        // taskList is thread-safe
105        // markSchedule sets volatile field
106        while (true) {
107            boolean added = taskList.add(wrap);
108            if (added) {
109                wrap.runnable().markSchedule(wrap);
110                return wrap;
111            }
112        }
113    }
114
115    public void shutdown() {
116        taskList.clear();
117        taskExecutor.shutdown();
118    }
119
120    @Override
121    public ScheduledTask asyncRun(Plugin plugin, ScheduledRunnable runnable) {
122        return this.doAdd(new ScheduledTaskImpl(plugin, TaskType.ASYNC_RUN, runnable, -1));
123    }
124
125    @Override
126    public ScheduledTask syncRun(Plugin plugin, ScheduledRunnable runnable) {
127        return this.doAdd(new ScheduledTaskImpl(plugin, TaskType.SYNC_RUN, runnable, -1));
128    }
129
130    @Override
131    public ScheduledTask asyncLater(Plugin plugin, ScheduledRunnable runnable, long delay) {
132        return this.doAdd(new ScheduledTaskImpl(plugin, TaskType.ASYNC_LATER, runnable, delay));
133    }
134
135    @Override
136    public ScheduledTask syncLater(Plugin plugin, ScheduledRunnable runnable, long delay) {
137        return this.doAdd(new ScheduledTaskImpl(plugin, TaskType.SYNC_LATER, runnable, delay));
138    }
139
140    @Override
141    public ScheduledTask asyncRepeat(final Plugin plugin, final ScheduledRunnable runnable, long delay,
142                                     final long initialInterval) {
143        // Schedule repeating ScheduledTask later
144        return this.asyncLater(plugin, new ScheduledRunnable() {
145            @Override
146            public void run() {
147                doAdd(new ScheduledTaskImpl(plugin, TaskType.ASYNC_REPEAT, runnable, initialInterval));
148            }
149        }, delay);
150    }
151
152    @Override
153    public ScheduledTask syncRepeat(final Plugin plugin, final ScheduledRunnable runnable, long delay,
154                                    final long initialInterval) {
155        // Schedule repeating ScheduledTask later
156        return this.syncLater(plugin, new ScheduledRunnable() {
157            @Override
158            public void run() {
159                doAdd(new ScheduledTaskImpl(plugin, TaskType.SYNC_REPEAT, runnable, initialInterval));
160            }
161        }, delay);
162    }
163
164    @Override
165    protected Collection<ScheduledTask> delegate() {
166        return ImmutableList.copyOf(taskList);
167    }
168
169    private class ScheduledTaskImpl implements ScheduledTask {
170        private final Plugin plugin;
171        private final TaskType type;
172        private final ScheduledRunnable runnable;
173
174        private final SelectableThread executor;
175        private final Runnable runner;
176
177        private volatile long interval;
178        private long run = 0L;
179
180        public ScheduledTaskImpl(Plugin plugin, TaskType type, final ScheduledRunnable runnable, long step) {
181            this.plugin = plugin;
182            this.type = type;
183            this.runnable = runnable;
184            this.interval = step;
185
186            if (!type.name().contains("REPEAT")) {
187                this.runner = () -> {
188                    runnable.beforeRun();
189                    runnable.run();
190                    runnable.afterAsyncRun();
191                    cancel();
192                };
193            } else {
194                this.runner = () -> {
195                    runnable.beforeRun();
196                    runnable.run();
197                    runnable.afterAsyncRun();
198                };
199            }
200
201            if (type.name().contains("ASYNC")) {
202                this.executor = taskExecutor.selectCore();
203            } else {
204                this.executor = new SelectableThread() {
205                    @Override public void execute(Runnable task) { TickSync.sync(task); }
206                    @Override public <V> Future<V> submitTask(Callable<V> task) { return null; }
207                    @Override public void interrupt() {}
208                    @Override public Thread asThread() { return null; }
209                };
210            }
211        }
212
213        @Override
214        public long interval() {
215            return this.interval;
216        }
217
218        @Override
219        public void setInterval(long interval) {
220            this.interval = interval;
221        }
222
223        @Override
224        public TaskType type() {
225            return this.type;
226        }
227
228        @Override
229        public ScheduledRunnable runnable() {
230            return this.runnable;
231        }
232
233        @Override
234        public Plugin owner() {
235            return this.plugin;
236        }
237
238        @Override
239        public void cancel() {
240            taskList.remove(this);
241        }
242
243        @Override
244        public void run() {
245            switch (type) {
246                case ASYNC_RUN:
247                case SYNC_RUN:
248                    this.executor.execute(this.runner);
249                    break;
250
251                case ASYNC_LATER:
252                case SYNC_LATER:
253                    // May be over if the interval set lower
254                    if (++run >= interval)
255                        this.executor.execute(this.runner);
256                    break;
257
258                case ASYNC_REPEAT:
259                case SYNC_REPEAT:
260                    // May be over if the interval set lower
261                    if (++run >= interval) {
262                        this.executor.execute(this.runner);
263                        run = 0;
264                    }
265                    break;
266                default:
267                    throw new IllegalStateException();
268            }
269        }
270    }
271}