001/* 002 * Trident - A Multithreaded Server Alternative 003 * Copyright 2014 The TridentSDK Team 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package net.tridentsdk.server.concurrent; 019 020import com.google.common.base.Preconditions; 021import com.google.common.collect.Lists; 022import com.google.common.collect.Sets; 023import net.tridentsdk.concurrent.SelectableThread; 024import net.tridentsdk.concurrent.SelectableThreadPool; 025import net.tridentsdk.docs.InternalUseOnly; 026 027import javax.annotation.Nonnull; 028import javax.annotation.concurrent.GuardedBy; 029import javax.annotation.concurrent.ThreadSafe; 030import java.util.*; 031import java.util.concurrent.*; 032import java.util.concurrent.atomic.AtomicInteger; 033import java.util.concurrent.locks.LockSupport; 034import java.util.concurrent.locks.StampedLock; 035 036/** 037 * Thread pool which allows tasks and result-bearing tasks to be executed concurrently 038 * 039 * <p>Internally, this class manages a List of the workers, which are simply TaskExecutors, and a global Set of other 040 * executors. This allows all workers and executors in the server to be found easily. The worker List is an expandable 041 * collection of internal thread workers. The decision to use a copy-on-write List instead of a Set was made based on 042 * the need for index based access, as well as the majority of operations upon the collection iterations from thread 043 * searching. Unfortunately, there are still many writes, as scaling requires the tracking of new workers, and the 044 * removal of the workers that are no longer needed.</p> 045 * 046 * <p>This thread pool always maintains the starting concurrent. Scaling is done once the current workers are occupied at 047 * the time of observation. Workers are deemed as occupied if concurrent are in the process of attempting insertion into 048 * the worker's internal queue. Workers are managed by native park and unparking, rather than using conditions. This 049 * provides numerous advantages, which include reduced overhead, as it is native, and is not bound to a particular scaleLock. 050 * Additionally, native thread scheduling provides for more control over basic thread stopping, rather than using the 051 * thread queue of a condition, or default guarding intrinsics.</p> 052 * 053 * <p>There are two basic locking areas: first on the thread advancement counter, and in the worker itself. They are 054 * both StampedLocks, which provide increased throughput (in fact, is the primary motivator for creating this class). 055 * In place of this class can be instead, a ThreadPoolExecutor. However, many new concurrent updates in Java 8 056 * rationalize an effort to create a new class which fully utilizes those features, and subsequently providing this 057 * class which is optimized to execute the heterogeneous tasks provided by the server. The first scaleLock protects the 058 * index which to pull workers from the worker Set, and a separate scaleLock, per-worker, protects the internal Deque. A 059 * Deque was selected as it can be inserted from both ends, sizable, and is array-based. Tests confirm that array 060 * based collections do outperform their node-based counter parts, as there is reduced instantiation overhead. The 061 * explicitly declared scaleLock allows to check occupation of the worker, which increases scalability.</p> 062 * 063 * <p>No thread pool would be complete without tuning. This class provides 3 basic tuning properties, which modify 064 * <em>expiring concurrent</em>. Expiring concurrent are new concurrent are those created to scale the executor. They are 065 * created when the current concurrent in the pool (including previously started expiring concurrent) are all occupied. 066 * One may modify the time which the worker expires, whether the task queue must be empty, and the maximum amount 067 * of concurrent in the pool.</p> 068 * 069 * @author The TridentSDK Team 070 */ 071@ThreadSafe 072public class ConcurrentTaskExecutor extends AbstractExecutorService implements SelectableThreadPool { 073 private static final Set<ConcurrentTaskExecutor> EXECUTORS = Sets.newConcurrentHashSet(); 074 private static final int INITIALIZING = 0; 075 private static final int STARTING = 1; 076 private static final int RUNNING = 2; 077 private static final int STOPPING = 3; 078 private static final int TERMINATED = 4; 079 080 private final String name; 081 082 private final List<SelectableThread> workerSet = Lists.newCopyOnWriteArrayList(); 083 private final AtomicInteger count = new AtomicInteger(); 084 private final int core; 085 086 @GuardedBy("coreLock") 087 private int coreIdx = 0; 088 private final StampedLock coreLock = new StampedLock(); 089 090 @GuardedBy("scaleLock") 091 private int scaleIdx = 0; 092 private final StampedLock scaleLock = new StampedLock(); 093 094 private volatile int state = INITIALIZING; 095 096 private volatile long expireIntervalMillis = 60_000; 097 private volatile boolean mustEmptyBeforeExpire = true; 098 private volatile int maxScale = 50; 099 100 @Override 101 public int maxThreads() { 102 return maxScale; 103 } 104 105 @Override 106 public void setMaxThreads(int maxScale) { 107 this.maxScale = maxScale; 108 } 109 110 @Override 111 public long threadExpiryTime() { 112 return expireIntervalMillis; 113 } 114 115 @Override 116 public void setThreadExpiryTime(long expireIntervalMillis) { 117 this.expireIntervalMillis = expireIntervalMillis; 118 } 119 120 @Override 121 public boolean mustEmptyBeforeExpire() { 122 return mustEmptyBeforeExpire; 123 } 124 125 @Override 126 public void setMustEmptyBeforeExpire(boolean mustEmptyBeforeExpire) { 127 this.mustEmptyBeforeExpire = mustEmptyBeforeExpire; 128 } 129 130 private ConcurrentTaskExecutor(int startingThreadCount, String name) { 131 Preconditions.checkArgument(startingThreadCount > 0); 132 133 this.name = name; 134 this.core = startingThreadCount; 135 136 state = STARTING; 137 for (int i = 0; i < startingThreadCount; i++) { 138 addWorker(false); 139 } 140 state = RUNNING; 141 } 142 143 public static ConcurrentTaskExecutor create(int startingThreadCount, String name) { 144 ConcurrentTaskExecutor ex = new ConcurrentTaskExecutor(startingThreadCount, name); 145 EXECUTORS.add(ex); 146 return ex; 147 } 148 149 @InternalUseOnly 150 public static Collection<ConcurrentTaskExecutor> executors() { 151 return EXECUTORS; 152 } 153 154 private ConcurrentWorker addWorker(boolean expire) { 155 ConcurrentWorker worker; 156 if (count.get() < maxThreads()) { 157 if (expire) { 158 worker = new ExpiringWorker(count.getAndIncrement()); 159 } else { 160 worker = new ConcurrentWorker(count.getAndIncrement()); 161 } 162 163 workerSet.add(worker); 164 worker.start(); 165 } else { 166 worker = (ConcurrentWorker) workerSet.get(ThreadLocalRandom.current().nextInt(workerSet.size())); 167 } 168 169 return worker; 170 } 171 172 @Override 173 public SelectableThread selectCore() { 174 int count; 175 int max = this.core - 1; 176 177 long stamp = coreLock.readLock(); 178 try { 179 count = this.coreIdx; 180 } finally { 181 coreLock.unlockRead(stamp); 182 } 183 184 if (count >= max) { 185 count = 0; 186 187 stamp = coreLock.writeLock(); 188 try { 189 this.coreIdx = 0; 190 } finally { 191 coreLock.unlockWrite(stamp); 192 } 193 } else { 194 stamp = coreLock.writeLock(); 195 try { 196 coreIdx++; 197 } finally { 198 coreLock.unlockWrite(stamp); 199 } 200 } 201 202 return workerSet.get(count); 203 } 204 205 // TODO HOT CONCURRENT METHOD NOT INLINEABLE: TOO LARGE 206 // 150 bytes 207 @Override 208 public SelectableThread selectNext() { 209 int count; 210 int max = this.workerSet.size(); 211 212 long stamp = scaleLock.readLock(); 213 try { 214 count = this.scaleIdx; 215 } finally { 216 scaleLock.unlockRead(stamp); 217 } 218 219 if (count >= max) { 220 count = 0; 221 222 stamp = scaleLock.writeLock(); 223 try { 224 this.scaleIdx = 0; 225 } finally { 226 scaleLock.unlockWrite(stamp); 227 } 228 } else { 229 stamp = scaleLock.writeLock(); 230 try { 231 scaleIdx++; 232 } finally { 233 scaleLock.unlockWrite(stamp); 234 } 235 } 236 237 return workerSet.get(count); 238 } 239 240 @Override 241 public SelectableThread selectScaled() { 242 for (SelectableThread ex : workerSet) { 243 ConcurrentWorker w = (ConcurrentWorker) ex; 244 if (!w.isHeld()) { 245 return w; 246 } 247 } 248 249 return addWorker(true); 250 } 251 252 @Override 253 public List<SelectableThread> workers() { 254 return workerSet; 255 } 256 257 @Override 258 public void shutdown() { 259 state = STOPPING; 260 workerSet.forEach(SelectableThread::interrupt); 261 workerSet.clear(); 262 EXECUTORS.remove(this); 263 state = TERMINATED; 264 } 265 266 // Executor implementations 267 268 @Override 269 public List<Runnable> shutdownNow() { 270 shutdown(); 271 return Collections.EMPTY_LIST; 272 } 273 274 @Override 275 public boolean isShutdown() { 276 return state > STOPPING; 277 } 278 279 @Override 280 public boolean isTerminated() { 281 return state == TERMINATED; 282 } 283 284 @Override 285 public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException { 286 long units = timeUnit.convert(System.nanoTime(), timeUnit); 287 new Thread(this::shutdownNow).start(); 288 289 while (state != TERMINATED) { 290 if (timeUnit.convert(System.nanoTime(), timeUnit) - units > l) { 291 return false; 292 } 293 } 294 295 return true; 296 } 297 298 @Nonnull @Override 299 public <T> Future<T> submit(Callable<T> task) { 300 final RunnableFuture<T> future = new FutureTask<>(task); 301 302 execute(future::run); 303 return future; 304 } 305 306 // TODO HOT CONCURRENT METHOD NOT INLINEABLE: CODE CACHE TOO SMALL 307 // 65 bytes 308 @Override 309 public void execute(@Nonnull Runnable runnable) { 310 for (SelectableThread ex : workerSet) { 311 ConcurrentWorker w = (ConcurrentWorker) ex; 312 if (!w.isHeld()) { 313 w.execute(runnable); 314 return; 315 } 316 } 317 318 ConcurrentWorker w = addWorker(true); 319 w.execute(runnable); 320 } 321 322 // Workers 323 324 private class ConcurrentWorker extends Thread implements SelectableThread { 325 @GuardedBy("scaleLock") 326 final Deque<Runnable> tasks = new ArrayDeque<>(64); 327 final StampedLock lock = new StampedLock(); 328 329 volatile boolean held; 330 331 public ConcurrentWorker(int index) { 332 super("Pool " + name + " #" + index); 333 } 334 335 @Override 336 public void run() { 337 while (!isInterrupted()) { 338 try { 339 Runnable runnable = nextTask(); 340 if (runnable == null) { 341 held = false; 342 LockSupport.park(); 343 } else { 344 runnable.run(); 345 } 346 } catch (Exception e) { 347 e.printStackTrace(); 348 } 349 } 350 } 351 352 Runnable nextTask() { 353 long stamp = lock.writeLock(); 354 try { 355 return tasks.pollLast(); 356 } finally { 357 lock.unlockWrite(stamp); 358 } 359 } 360 361 boolean isHeld() { 362 return held; 363 } 364 365 @Override 366 public void execute(Runnable task) { 367 if (Thread.currentThread().equals(asThread())) { 368 task.run(); 369 return; 370 } 371 372 long stamp = lock.writeLock(); 373 try { 374 tasks.offerFirst(task); 375 } finally { 376 lock.unlockWrite(stamp); 377 } 378 379 held = true; 380 LockSupport.unpark(this); 381 } 382 383 @Override 384 public <V> Future<V> submitTask(Callable<V> task) { 385 final RunnableFuture<V> future = new FutureTask<>(task); 386 387 execute(future::run); 388 return future; 389 } 390 391 @Override 392 public void interrupt() { 393 LockSupport.unpark(asThread()); 394 super.interrupt(); 395 396 long stamp = lock.writeLock(); 397 try { 398 tasks.clear(); 399 } finally { 400 lock.unlockWrite(stamp); 401 } 402 } 403 404 @Override 405 public Thread asThread() { 406 return this; 407 } 408 } 409 410 private class ExpiringWorker extends ConcurrentWorker { 411 long last = System.currentTimeMillis(); 412 413 public ExpiringWorker(int index) { 414 super(index); 415 } 416 417 @Override 418 Runnable nextTask() { 419 long stamp = lock.writeLock(); 420 Runnable runnable; 421 try { 422 runnable = tasks.pollLast(); 423 } finally { 424 lock.unlockWrite(stamp); 425 } 426 427 if (runnable == null) { 428 // Expiration mechanics, in the case of spurious wakeups 429 long time = System.currentTimeMillis(); 430 if ((time - this.last) >= expireIntervalMillis) { 431 this.interrupt(); 432 } 433 434 // Processing tasks very very quickly can result in stackoverflows 435 // if this method is called too often recursively 436 return () -> LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(expireIntervalMillis)); 437 } else { 438 // Expiration mechanics 439 long time = System.currentTimeMillis(); 440 if ((time - this.last) >= expireIntervalMillis) { 441 if (mustEmptyBeforeExpire) { 442 if (isEmpty()) { 443 return () -> { 444 runnable.run(); 445 this.interrupt(); 446 }; 447 } 448 } 449 } 450 451 this.last = time; 452 return runnable; 453 } 454 } 455 456 // TODO HOT CONCURRENT METHOD NOT INLINEABLE: TOO LARGE 457 // 79 bytes 458 @Override 459 public void interrupt() { 460 // Most important thing: don't allow new tasks to be submitted 461 workerSet.remove(this); 462 count.decrementAndGet(); 463 464 Queue<Runnable> left; 465 long stamp = lock.readLock(); 466 try { 467 left = tasks; 468 } finally { 469 lock.unlockRead(stamp); 470 } 471 472 // in case I dun goofed 473 left.forEach(r -> selectCore().execute(r)); 474 475 super.interrupt(); 476 } 477 478 private boolean isEmpty() { 479 long stamp = lock.readLock(); 480 try { 481 return tasks.isEmpty(); 482 } finally { 483 lock.unlockRead(stamp); 484 } 485 } 486 } 487}