001/*
002 * Trident - A Multithreaded Server Alternative
003 * Copyright 2014 The TridentSDK Team
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *    http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package net.tridentsdk.server.concurrent;
019
020import com.google.common.base.Preconditions;
021import com.google.common.collect.Lists;
022import com.google.common.collect.Sets;
023import net.tridentsdk.concurrent.SelectableThread;
024import net.tridentsdk.concurrent.SelectableThreadPool;
025import net.tridentsdk.docs.InternalUseOnly;
026
027import javax.annotation.Nonnull;
028import javax.annotation.concurrent.GuardedBy;
029import javax.annotation.concurrent.ThreadSafe;
030import java.util.*;
031import java.util.concurrent.*;
032import java.util.concurrent.atomic.AtomicInteger;
033import java.util.concurrent.locks.LockSupport;
034import java.util.concurrent.locks.StampedLock;
035
036/**
037 * Thread pool which allows tasks and result-bearing tasks to be executed concurrently
038 *
039 * <p>Internally, this class manages a List of the workers, which are simply TaskExecutors, and a global Set of other
040 * executors. This allows all workers and executors in the server to be found easily. The worker List is an expandable 
041 * collection of internal thread workers. The decision to use a copy-on-write List instead of a Set was made based on
042 * the need for index based access, as well as the majority of operations upon the collection iterations from thread
043 * searching. Unfortunately, there are still many writes, as scaling requires the tracking of new workers, and the
044 * removal of the workers that are no longer needed.</p>
045 *
046 * <p>This thread pool always maintains the starting concurrent. Scaling is done once the current workers are occupied at
047 * the time of observation. Workers are deemed as occupied if concurrent are in the process of attempting insertion into
048 * the worker's internal queue. Workers are managed by native park and unparking, rather than using conditions. This
049 * provides numerous advantages, which include reduced overhead, as it is native, and is not bound to a particular scaleLock.
050 * Additionally, native thread scheduling provides for more control over basic thread stopping, rather than using the
051 * thread queue of a condition, or default guarding intrinsics.</p>
052 * 
053 * <p>There are two basic locking areas: first on the thread advancement counter, and in the worker itself. They are
054 * both StampedLocks, which provide increased throughput (in fact, is the primary motivator for creating this class).
055 * In place of this class can be instead, a ThreadPoolExecutor. However, many new concurrent updates in Java 8
056 * rationalize an effort to create a new class which fully utilizes those features, and subsequently providing this
057 * class which is optimized to execute the heterogeneous tasks provided by the server. The first scaleLock protects the
058 * index which to pull workers from the worker Set, and a separate scaleLock, per-worker, protects the internal Deque. A
059 * Deque was selected as it can be inserted from both ends, sizable, and is array-based. Tests confirm that array
060 * based collections do outperform their node-based counter parts, as there is reduced instantiation overhead. The
061 * explicitly declared scaleLock allows to check occupation of the worker, which increases scalability.</p>
062 * 
063 * <p>No thread pool would be complete without tuning. This class provides 3 basic tuning properties, which modify
064 * <em>expiring concurrent</em>. Expiring concurrent are new concurrent are those created to scale the executor. They are
065 * created when the current concurrent in the pool (including previously started expiring concurrent) are all occupied.
066 * One may modify the time which the worker expires, whether the task queue must be empty, and the maximum amount
067 * of concurrent in the pool.</p>
068 *
069 * @author The TridentSDK Team
070 */
071@ThreadSafe
072public class ConcurrentTaskExecutor extends AbstractExecutorService implements SelectableThreadPool {
073    private static final Set<ConcurrentTaskExecutor> EXECUTORS = Sets.newConcurrentHashSet();
074    private static final int INITIALIZING = 0;
075    private static final int STARTING = 1;
076    private static final int RUNNING = 2;
077    private static final int STOPPING = 3;
078    private static final int TERMINATED = 4;
079
080    private final String name;
081
082    private final List<SelectableThread> workerSet = Lists.newCopyOnWriteArrayList();
083    private final AtomicInteger count = new AtomicInteger();
084    private final int core;
085
086    @GuardedBy("coreLock")
087    private int coreIdx = 0;
088    private final StampedLock coreLock = new StampedLock();
089
090    @GuardedBy("scaleLock")
091    private int scaleIdx = 0;
092    private final StampedLock scaleLock = new StampedLock();
093
094    private volatile int state = INITIALIZING;
095
096    private volatile long expireIntervalMillis = 60_000;
097    private volatile boolean mustEmptyBeforeExpire = true;
098    private volatile int maxScale = 50;
099
100    @Override
101    public int maxThreads() {
102        return maxScale;
103    }
104
105    @Override
106    public void setMaxThreads(int maxScale) {
107        this.maxScale = maxScale;
108    }
109
110    @Override
111    public long threadExpiryTime() {
112        return expireIntervalMillis;
113    }
114
115    @Override
116    public void setThreadExpiryTime(long expireIntervalMillis) {
117        this.expireIntervalMillis = expireIntervalMillis;
118    }
119
120    @Override
121    public boolean mustEmptyBeforeExpire() {
122        return mustEmptyBeforeExpire;
123    }
124
125    @Override
126    public void setMustEmptyBeforeExpire(boolean mustEmptyBeforeExpire) {
127        this.mustEmptyBeforeExpire = mustEmptyBeforeExpire;
128    }
129
130    private ConcurrentTaskExecutor(int startingThreadCount, String name) {
131        Preconditions.checkArgument(startingThreadCount > 0);
132
133        this.name = name;
134        this.core = startingThreadCount;
135
136        state = STARTING;
137        for (int i = 0; i < startingThreadCount; i++) {
138            addWorker(false);
139        }
140        state = RUNNING;
141    }
142
143    public static ConcurrentTaskExecutor create(int startingThreadCount, String name) {
144        ConcurrentTaskExecutor ex = new ConcurrentTaskExecutor(startingThreadCount, name);
145        EXECUTORS.add(ex);
146        return ex;
147    }
148
149    @InternalUseOnly
150    public static Collection<ConcurrentTaskExecutor> executors() {
151        return EXECUTORS;
152    }
153
154    private ConcurrentWorker addWorker(boolean expire) {
155        ConcurrentWorker worker;
156        if (count.get() < maxThreads()) {
157            if (expire) {
158                worker = new ExpiringWorker(count.getAndIncrement());
159            } else {
160                worker = new ConcurrentWorker(count.getAndIncrement());
161            }
162
163            workerSet.add(worker);
164            worker.start();
165        } else {
166            worker = (ConcurrentWorker) workerSet.get(ThreadLocalRandom.current().nextInt(workerSet.size()));
167        }
168
169        return worker;
170    }
171
172    @Override
173    public SelectableThread selectCore() {
174        int count;
175        int max = this.core - 1;
176
177        long stamp = coreLock.readLock();
178        try {
179            count = this.coreIdx;
180        } finally {
181            coreLock.unlockRead(stamp);
182        }
183
184        if (count >= max) {
185            count = 0;
186
187            stamp = coreLock.writeLock();
188            try {
189                this.coreIdx = 0;
190            } finally {
191                coreLock.unlockWrite(stamp);
192            }
193        } else {
194            stamp = coreLock.writeLock();
195            try {
196                coreIdx++;
197            } finally {
198                coreLock.unlockWrite(stamp);
199            }
200        }
201
202        return workerSet.get(count);
203    }
204
205    // TODO HOT CONCURRENT METHOD NOT INLINEABLE: TOO LARGE
206    // 150 bytes
207    @Override
208    public SelectableThread selectNext() {
209        int count;
210        int max = this.workerSet.size();
211
212        long stamp = scaleLock.readLock();
213        try {
214            count = this.scaleIdx;
215        } finally {
216            scaleLock.unlockRead(stamp);
217        }
218
219        if (count >= max) {
220            count = 0;
221
222            stamp = scaleLock.writeLock();
223            try {
224                this.scaleIdx = 0;
225            } finally {
226                scaleLock.unlockWrite(stamp);
227            }
228        } else {
229            stamp = scaleLock.writeLock();
230            try {
231                scaleIdx++;
232            } finally {
233                scaleLock.unlockWrite(stamp);
234            }
235        }
236
237        return workerSet.get(count);
238    }
239
240    @Override
241    public SelectableThread selectScaled() {
242        for (SelectableThread ex : workerSet) {
243            ConcurrentWorker w = (ConcurrentWorker) ex;
244            if (!w.isHeld()) {
245                return w;
246            }
247        }
248
249        return addWorker(true);
250    }
251
252    @Override
253    public List<SelectableThread> workers() {
254        return workerSet;
255    }
256
257    @Override
258    public void shutdown() {
259        state = STOPPING;
260        workerSet.forEach(SelectableThread::interrupt);
261        workerSet.clear();
262        EXECUTORS.remove(this);
263        state = TERMINATED;
264    }
265
266    // Executor implementations
267
268    @Override
269    public List<Runnable> shutdownNow() {
270        shutdown();
271        return Collections.EMPTY_LIST;
272    }
273
274    @Override
275    public boolean isShutdown() {
276        return state > STOPPING;
277    }
278
279    @Override
280    public boolean isTerminated() {
281        return state == TERMINATED;
282    }
283
284    @Override
285    public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException {
286        long units = timeUnit.convert(System.nanoTime(), timeUnit);
287        new Thread(this::shutdownNow).start();
288
289        while (state != TERMINATED) {
290            if (timeUnit.convert(System.nanoTime(), timeUnit) - units > l) {
291                return false;
292            }
293        }
294
295        return true;
296    }
297
298    @Nonnull @Override
299    public <T> Future<T> submit(Callable<T> task) {
300        final RunnableFuture<T> future = new FutureTask<>(task);
301
302        execute(future::run);
303        return future;
304    }
305
306    // TODO HOT CONCURRENT METHOD NOT INLINEABLE: CODE CACHE TOO SMALL
307    // 65 bytes
308    @Override
309    public void execute(@Nonnull Runnable runnable) {
310        for (SelectableThread ex : workerSet) {
311            ConcurrentWorker w = (ConcurrentWorker) ex;
312            if (!w.isHeld()) {
313                w.execute(runnable);
314                return;
315            }
316        }
317
318        ConcurrentWorker w = addWorker(true);
319        w.execute(runnable);
320    }
321
322    // Workers
323
324    private class ConcurrentWorker extends Thread implements SelectableThread {
325        @GuardedBy("scaleLock")
326        final Deque<Runnable> tasks = new ArrayDeque<>(64);
327        final StampedLock lock = new StampedLock();
328
329        volatile boolean held;
330
331        public ConcurrentWorker(int index) {
332            super("Pool " + name + " #" + index);
333        }
334
335        @Override
336        public void run() {
337            while (!isInterrupted()) {
338                try {
339                    Runnable runnable = nextTask();
340                    if (runnable == null) {
341                        held = false;
342                        LockSupport.park();
343                    } else {
344                        runnable.run();
345                    }
346                } catch (Exception e) {
347                    e.printStackTrace();
348                }
349            }
350        }
351
352        Runnable nextTask() {
353            long stamp = lock.writeLock();
354            try {
355                return tasks.pollLast();
356            } finally {
357                lock.unlockWrite(stamp);
358            }
359        }
360
361        boolean isHeld() {
362             return held;
363        }
364
365        @Override
366        public void execute(Runnable task) {
367            if (Thread.currentThread().equals(asThread())) {
368                task.run();
369                return;
370            }
371
372            long stamp = lock.writeLock();
373            try {
374                tasks.offerFirst(task);
375            } finally {
376                lock.unlockWrite(stamp);
377            }
378
379            held = true;
380            LockSupport.unpark(this);
381        }
382
383        @Override
384        public <V> Future<V> submitTask(Callable<V> task) {
385            final RunnableFuture<V> future = new FutureTask<>(task);
386
387            execute(future::run);
388            return future;
389        }
390
391        @Override
392        public void interrupt() {
393            LockSupport.unpark(asThread());
394            super.interrupt();
395
396            long stamp = lock.writeLock();
397            try {
398                tasks.clear();
399            } finally {
400                lock.unlockWrite(stamp);
401            }
402        }
403
404        @Override
405        public Thread asThread() {
406            return this;
407        }
408    }
409
410    private class ExpiringWorker extends ConcurrentWorker {
411        long last = System.currentTimeMillis();
412
413        public ExpiringWorker(int index) {
414            super(index);
415        }
416
417        @Override
418        Runnable nextTask() {
419            long stamp = lock.writeLock();
420            Runnable runnable;
421            try {
422                runnable = tasks.pollLast();
423            } finally {
424                lock.unlockWrite(stamp);
425            }
426
427            if (runnable == null) {
428                // Expiration mechanics, in the case of spurious wakeups
429                long time = System.currentTimeMillis();
430                if ((time - this.last) >= expireIntervalMillis) {
431                    this.interrupt();
432                }
433
434                // Processing tasks very very quickly can result in stackoverflows
435                // if this method is called too often recursively
436                return () -> LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(expireIntervalMillis));
437            } else {
438                // Expiration mechanics
439                long time = System.currentTimeMillis();
440                if ((time - this.last) >= expireIntervalMillis) {
441                    if (mustEmptyBeforeExpire) {
442                        if (isEmpty()) {
443                            return () -> {
444                                runnable.run();
445                                this.interrupt();
446                            };
447                        }
448                    }
449                }
450
451                this.last = time;
452                return runnable;
453            }
454        }
455
456        // TODO HOT CONCURRENT METHOD NOT INLINEABLE: TOO LARGE
457        // 79 bytes
458        @Override
459        public void interrupt() {
460            // Most important thing: don't allow new tasks to be submitted
461            workerSet.remove(this);
462            count.decrementAndGet();
463
464            Queue<Runnable> left;
465            long stamp = lock.readLock();
466            try {
467                left = tasks;
468            } finally {
469                lock.unlockRead(stamp);
470            }
471
472            // in case I dun goofed
473            left.forEach(r -> selectCore().execute(r));
474
475            super.interrupt();
476        }
477
478        private boolean isEmpty() {
479            long stamp = lock.readLock();
480            try {
481                return tasks.isEmpty();
482            } finally {
483                lock.unlockRead(stamp);
484            }
485        }
486    }
487}