001package org.apache.commons.jcs3.utils.threadpool; 002 003/* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022import java.util.Iterator; 023import java.util.Map; 024import java.util.Properties; 025import java.util.Set; 026import java.util.concurrent.BlockingQueue; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Executors; 030import java.util.concurrent.LinkedBlockingQueue; 031import java.util.concurrent.ScheduledExecutorService; 032import java.util.concurrent.ThreadPoolExecutor; 033import java.util.concurrent.TimeUnit; 034 035import org.apache.commons.jcs3.log.Log; 036import org.apache.commons.jcs3.log.LogManager; 037import org.apache.commons.jcs3.utils.config.PropertySetter; 038 039/** 040 * This manages threadpools for an application 041 * <p> 042 * It is a singleton since threads need to be managed vm wide. 043 * </p> 044 * <p> 045 * This manager forces you to use a bounded queue. By default it uses the current thread for 046 * execution when the buffer is full and no free threads can be created. 047 * </p> 048 * <p> 049 * You can specify the props file to use or pass in a properties object prior to configuration. 050 * </p> 051 * <p> 052 * If set, the Properties object will take precedence. 053 * </p> 054 * <p> 055 * If a value is not set for a particular pool, the hard coded defaults in <code>PoolConfiguration</code> will be used. 056 * You can configure default settings by specifying <code>thread_pool.default</code> in the properties, ie "cache.ccf" 057 * </p> 058 */ 059public class ThreadPoolManager 060{ 061 /** The logger */ 062 private static final Log log = LogManager.getLog( ThreadPoolManager.class ); 063 064 /** The default config, created using property defaults if present, else those above. */ 065 private PoolConfiguration defaultConfig; 066 067 /** The default scheduler config, created using property defaults if present, else those above. */ 068 private PoolConfiguration defaultSchedulerConfig; 069 070 /** the root property name */ 071 private static final String PROP_NAME_ROOT = "thread_pool"; 072 073 /** default property file name */ 074 private static final String DEFAULT_PROP_NAME_ROOT = "thread_pool.default"; 075 076 /** the scheduler root property name */ 077 private static final String PROP_NAME_SCHEDULER_ROOT = "scheduler_pool"; 078 079 /** default scheduler property file name */ 080 private static final String DEFAULT_PROP_NAME_SCHEDULER_ROOT = "scheduler_pool.default"; 081 082 /** 083 * You can specify the properties to be used to configure the thread pool. Setting this post 084 * initialization will have no effect. 085 */ 086 private static volatile Properties props; 087 088 /** Map of names to pools. */ 089 private final ConcurrentHashMap<String, ExecutorService> pools; 090 091 /** Map of names to scheduler pools. */ 092 private final ConcurrentHashMap<String, ScheduledExecutorService> schedulerPools; 093 094 /** 095 * The ThreadPoolManager instance (holder pattern) 096 */ 097 private static class ThreadPoolManagerHolder 098 { 099 static final ThreadPoolManager INSTANCE = new ThreadPoolManager(); 100 } 101 102 /** 103 * No instances please. This is a singleton. 104 */ 105 private ThreadPoolManager() 106 { 107 this.pools = new ConcurrentHashMap<>(); 108 this.schedulerPools = new ConcurrentHashMap<>(); 109 configure(); 110 } 111 112 /** 113 * Creates a pool based on the configuration info. 114 * <p> 115 * @param config the pool configuration 116 * @param threadNamePrefix prefix for the thread names of the pool 117 * @return A ThreadPool wrapper 118 */ 119 public ExecutorService createPool( final PoolConfiguration config, final String threadNamePrefix) 120 { 121 return createPool(config, threadNamePrefix, Thread.NORM_PRIORITY); 122 } 123 124 /** 125 * Creates a pool based on the configuration info. 126 * <p> 127 * @param config the pool configuration 128 * @param threadNamePrefix prefix for the thread names of the pool 129 * @param threadPriority the priority of the created threads 130 * @return A ThreadPool wrapper 131 */ 132 public ExecutorService createPool( final PoolConfiguration config, final String threadNamePrefix, final int threadPriority ) 133 { 134 BlockingQueue<Runnable> queue = null; 135 if ( config.isUseBoundary() ) 136 { 137 log.debug( "Creating a Bounded Buffer to use for the pool" ); 138 queue = new LinkedBlockingQueue<>(config.getBoundarySize()); 139 } 140 else 141 { 142 log.debug( "Creating a non bounded Linked Queue to use for the pool" ); 143 queue = new LinkedBlockingQueue<>(); 144 } 145 146 final ThreadPoolExecutor pool = new ThreadPoolExecutor( 147 config.getStartUpSize(), 148 config.getMaximumPoolSize(), 149 config.getKeepAliveTime(), 150 TimeUnit.MILLISECONDS, 151 queue, 152 new DaemonThreadFactory(threadNamePrefix, threadPriority)); 153 154 // when blocked policy 155 switch (config.getWhenBlockedPolicy()) 156 { 157 case ABORT: 158 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); 159 break; 160 161 case RUN: 162 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); 163 break; 164 165 case WAIT: 166 throw new RuntimeException("POLICY_WAIT no longer supported"); 167 168 case DISCARDOLDEST: 169 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); 170 break; 171 172 default: 173 break; 174 } 175 176 pool.prestartAllCoreThreads(); 177 178 return pool; 179 } 180 181 /** 182 * Creates a scheduler pool based on the configuration info. 183 * <p> 184 * @param config the pool configuration 185 * @param threadNamePrefix prefix for the thread names of the pool 186 * @param threadPriority the priority of the created threads 187 * @return A ScheduledExecutorService 188 */ 189 public ScheduledExecutorService createSchedulerPool( final PoolConfiguration config, final String threadNamePrefix, final int threadPriority ) 190 { 191 192 return Executors.newScheduledThreadPool( 193 config.getMaximumPoolSize(), 194 new DaemonThreadFactory(threadNamePrefix, threadPriority)); 195 } 196 197 /** 198 * Returns a configured instance of the ThreadPoolManger To specify a configuration file or 199 * Properties object to use call the appropriate setter prior to calling getInstance. 200 * <p> 201 * @return The single instance of the ThreadPoolManager 202 */ 203 public static ThreadPoolManager getInstance() 204 { 205 return ThreadPoolManagerHolder.INSTANCE; 206 } 207 208 /** 209 * Dispose of the instance of the ThreadPoolManger and shut down all thread pools 210 */ 211 public static void dispose() 212 { 213 for ( final Iterator<Map.Entry<String, ExecutorService>> i = 214 getInstance().pools.entrySet().iterator(); i.hasNext(); ) 215 { 216 final Map.Entry<String, ExecutorService> entry = i.next(); 217 try 218 { 219 entry.getValue().shutdownNow(); 220 } 221 catch (final Throwable t) 222 { 223 log.warn("Failed to close pool {0}", entry.getKey(), t); 224 } 225 i.remove(); 226 } 227 228 for ( final Iterator<Map.Entry<String, ScheduledExecutorService>> i = 229 getInstance().schedulerPools.entrySet().iterator(); i.hasNext(); ) 230 { 231 final Map.Entry<String, ScheduledExecutorService> entry = i.next(); 232 try 233 { 234 entry.getValue().shutdownNow(); 235 } 236 catch (final Throwable t) 237 { 238 log.warn("Failed to close pool {0}", entry.getKey(), t); 239 } 240 i.remove(); 241 } 242 } 243 244 /** 245 * Returns an executor service by name. If a service by this name does not exist in the configuration file or 246 * properties, one will be created using the default values. 247 * <p> 248 * Services are lazily created. 249 * <p> 250 * @param name 251 * @return The executor service configured for the name. 252 */ 253 public ExecutorService getExecutorService( final String name ) 254 { 255 return pools.computeIfAbsent(name, key -> { 256 log.debug( "Creating pool for name [{0}]", key ); 257 final PoolConfiguration config = loadConfig( PROP_NAME_ROOT + "." + key, defaultConfig ); 258 return createPool( config, "JCS-ThreadPoolManager-" + key + "-" ); 259 }); 260 } 261 262 /** 263 * Returns a scheduler pool by name. If a pool by this name does not exist in the configuration file or 264 * properties, one will be created using the default values. 265 * <p> 266 * Pools are lazily created. 267 * <p> 268 * @param name 269 * @return The scheduler pool configured for the name. 270 */ 271 public ScheduledExecutorService getSchedulerPool( final String name ) 272 { 273 return schedulerPools.computeIfAbsent(name, key -> { 274 log.debug( "Creating scheduler pool for name [{0}]", key ); 275 final PoolConfiguration config = loadConfig( PROP_NAME_SCHEDULER_ROOT + "." + key, 276 defaultSchedulerConfig ); 277 return createSchedulerPool( config, "JCS-ThreadPoolManager-" + key + "-", Thread.NORM_PRIORITY ); 278 }); 279 } 280 281 /** 282 * Returns the names of all configured pools. 283 * <p> 284 * @return ArrayList of string names 285 */ 286 protected Set<String> getPoolNames() 287 { 288 return pools.keySet(); 289 } 290 291 /** 292 * This will be used if it is not null on initialization. Setting this post initialization will 293 * have no effect. 294 * <p> 295 * @param props The props to set. 296 */ 297 public static void setProps( final Properties props ) 298 { 299 ThreadPoolManager.props = props; 300 } 301 302 /** 303 * Initialize the ThreadPoolManager and create all the pools defined in the configuration. 304 */ 305 private void configure() 306 { 307 log.debug( "Initializing ThreadPoolManager" ); 308 309 if ( props == null ) 310 { 311 log.warn( "No configuration settings found. Using hardcoded default values for all pools." ); 312 props = new Properties(); 313 } 314 315 // set initial default and then override if new settings are available 316 defaultConfig = loadConfig( DEFAULT_PROP_NAME_ROOT, new PoolConfiguration() ); 317 defaultSchedulerConfig = loadConfig( DEFAULT_PROP_NAME_SCHEDULER_ROOT, new PoolConfiguration() ); 318 } 319 320 /** 321 * Configures the PoolConfiguration settings. 322 * <p> 323 * @param root the configuration key prefix 324 * @param defaultPoolConfiguration the default configuration 325 * @return PoolConfiguration 326 */ 327 private static PoolConfiguration loadConfig( final String root, final PoolConfiguration defaultPoolConfiguration ) 328 { 329 final PoolConfiguration config = defaultPoolConfiguration.clone(); 330 PropertySetter.setProperties( config, props, root + "." ); 331 332 log.debug( "{0} PoolConfiguration = {1}", root, config ); 333 334 return config; 335 } 336}