001package org.apache.commons.jcs3.auxiliary.remote.server; 002 003/* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022import java.io.IOException; 023import java.io.Serializable; 024import java.rmi.RemoteException; 025import java.rmi.registry.Registry; 026import java.rmi.server.RMISocketFactory; 027import java.rmi.server.UnicastRemoteObject; 028import java.rmi.server.Unreferenced; 029import java.util.Collections; 030import java.util.Map; 031import java.util.Properties; 032import java.util.Set; 033import java.util.concurrent.ConcurrentHashMap; 034import java.util.concurrent.ConcurrentMap; 035 036import org.apache.commons.jcs3.access.exception.CacheException; 037import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheListener; 038import org.apache.commons.jcs3.auxiliary.remote.server.behavior.IRemoteCacheServer; 039import org.apache.commons.jcs3.auxiliary.remote.server.behavior.IRemoteCacheServerAttributes; 040import org.apache.commons.jcs3.auxiliary.remote.server.behavior.RemoteType; 041import org.apache.commons.jcs3.engine.CacheEventQueueFactory; 042import org.apache.commons.jcs3.engine.CacheListeners; 043import org.apache.commons.jcs3.engine.behavior.ICacheElement; 044import org.apache.commons.jcs3.engine.behavior.ICacheEventQueue; 045import org.apache.commons.jcs3.engine.behavior.ICacheListener; 046import org.apache.commons.jcs3.engine.control.CompositeCache; 047import org.apache.commons.jcs3.engine.control.CompositeCacheManager; 048import org.apache.commons.jcs3.engine.logging.CacheEvent; 049import org.apache.commons.jcs3.engine.logging.behavior.ICacheEvent; 050import org.apache.commons.jcs3.engine.logging.behavior.ICacheEventLogger; 051import org.apache.commons.jcs3.log.Log; 052import org.apache.commons.jcs3.log.LogManager; 053import org.apache.commons.jcs3.utils.timing.ElapsedTimer; 054 055/** 056 * This class provides remote cache services. The remote cache server propagates events from local 057 * caches to other local caches. It can also store cached data, making it available to new clients. 058 * <p> 059 * Remote cache servers can be clustered. If the cache used by this remote cache is configured to 060 * use a remote cache of type cluster, the two remote caches will communicate with each other. 061 * Remote and put requests can be sent from one remote to another. If they are configured to 062 * broadcast such event to their client, then remove an puts can be sent to all locals in the 063 * cluster. 064 * <p> 065 * Get requests are made between clustered servers if AllowClusterGet is true. You can setup several 066 * clients to use one remote server and several to use another. The get local will be distributed 067 * between the two servers. Since caches are usually high get and low put, this should allow you to 068 * scale. 069 */ 070public class RemoteCacheServer<K, V> 071 extends UnicastRemoteObject 072 implements IRemoteCacheServer<K, V>, Unreferenced 073{ 074 public static final String DFEAULT_REMOTE_CONFIGURATION_FILE = "/remote.cache.ccf"; 075 076 /** For serialization. Don't change. */ 077 private static final long serialVersionUID = -8072345435941473116L; 078 079 /** log instance */ 080 private static final Log log = LogManager.getLog( RemoteCacheServer.class ); 081 082 /** Number of puts into the cache. */ 083 private int puts; 084 085 /** Maps cache name to CacheListeners object. association of listeners (regions). */ 086 private final transient ConcurrentMap<String, CacheListeners<K, V>> cacheListenersMap = 087 new ConcurrentHashMap<>(); 088 089 /** maps cluster listeners to regions. */ 090 private final transient ConcurrentMap<String, CacheListeners<K, V>> clusterListenersMap = 091 new ConcurrentHashMap<>(); 092 093 /** The central hub */ 094 private transient CompositeCacheManager cacheManager; 095 096 /** relates listener id with a type */ 097 private final ConcurrentMap<Long, RemoteType> idTypeMap = new ConcurrentHashMap<>(); 098 099 /** relates listener id with an ip address */ 100 private final ConcurrentMap<Long, String> idIPMap = new ConcurrentHashMap<>(); 101 102 /** Used to get the next listener id. */ 103 private final int[] listenerId = new int[1]; 104 105 /** Configuration settings. */ 106 // package protected for access by unit test code 107 final IRemoteCacheServerAttributes remoteCacheServerAttributes; 108 109 /** The interval at which we will log updates. */ 110 private static final int logInterval = 100; 111 112 /** An optional event logger */ 113 private transient ICacheEventLogger cacheEventLogger; 114 115 /** 116 * Constructor for the RemoteCacheServer object. This initializes the server with the values 117 * from the properties object. 118 * <p> 119 * @param rcsa 120 * @param config cache hub configuration 121 * @throws RemoteException 122 */ 123 protected RemoteCacheServer( final IRemoteCacheServerAttributes rcsa, final Properties config ) 124 throws RemoteException 125 { 126 super( rcsa.getServicePort() ); 127 this.remoteCacheServerAttributes = rcsa; 128 init( config ); 129 } 130 131 /** 132 * Constructor for the RemoteCacheServer object. This initializes the server with the values 133 * from the properties object. 134 * <p> 135 * @param rcsa 136 * @param config cache hub configuration 137 * @param customRMISocketFactory 138 * @throws RemoteException 139 */ 140 protected RemoteCacheServer( final IRemoteCacheServerAttributes rcsa, final Properties config, final RMISocketFactory customRMISocketFactory ) 141 throws RemoteException 142 { 143 super( rcsa.getServicePort(), customRMISocketFactory, customRMISocketFactory ); 144 this.remoteCacheServerAttributes = rcsa; 145 init( config ); 146 } 147 148 /** 149 * Initialize the RMI Cache Server from a properties object. 150 * <p> 151 * @param prop the configuration properties 152 * @throws RemoteException if the configuration of the cache manager instance fails 153 */ 154 private void init( final Properties prop ) throws RemoteException 155 { 156 try 157 { 158 cacheManager = createCacheManager( prop ); 159 } 160 catch (final CacheException e) 161 { 162 throw new RemoteException(e.getMessage(), e); 163 } 164 165 // cacheManager would have created a number of ICache objects. 166 // Use these objects to set up the cacheListenersMap. 167 cacheManager.getCacheNames().forEach(name -> { 168 final CompositeCache<K, V> cache = cacheManager.getCache( name ); 169 cacheListenersMap.put( name, new CacheListeners<>( cache ) ); 170 }); 171 } 172 173 /** 174 * Subclass can override this method to create the specific cache manager. 175 * <p> 176 * @param prop the configuration object. 177 * @return The cache hub configured with this configuration. 178 * 179 * @throws CacheException if the configuration cannot be loaded 180 */ 181 private static CompositeCacheManager createCacheManager( final Properties prop ) throws CacheException 182 { 183 final CompositeCacheManager hub = CompositeCacheManager.getUnconfiguredInstance(); 184 hub.configure( prop ); 185 return hub; 186 } 187 188 /** 189 * Puts a cache bean to the remote cache and notifies all listeners which <br> 190 * <ol> 191 * <li>have a different listener id than the originating host;</li> 192 * <li>are currently subscribed to the related cache.</li> 193 * </ol> 194 * <p> 195 * @param item 196 * @throws IOException 197 */ 198 public void put( final ICacheElement<K, V> item ) 199 throws IOException 200 { 201 update( item ); 202 } 203 204 /** 205 * @param item 206 * @throws IOException 207 */ 208 @Override 209 public void update( final ICacheElement<K, V> item ) 210 throws IOException 211 { 212 update( item, 0 ); 213 } 214 215 /** 216 * The internal processing is wrapped in event logging calls. 217 * <p> 218 * @param item 219 * @param requesterId 220 * @throws IOException 221 */ 222 @Override 223 public void update( final ICacheElement<K, V> item, final long requesterId ) 224 throws IOException 225 { 226 final ICacheEvent<ICacheElement<K, V>> cacheEvent = createICacheEvent( item, requesterId, ICacheEventLogger.UPDATE_EVENT ); 227 try 228 { 229 processUpdate( item, requesterId ); 230 } 231 finally 232 { 233 logICacheEvent( cacheEvent ); 234 } 235 } 236 237 /** 238 * An update can come from either a local cache's remote auxiliary, or it can come from a remote 239 * server. A remote server is considered a source of type cluster. 240 * <p> 241 * If the update came from a cluster, then we should tell the cache manager that this was a 242 * remote put. This way, any lateral and remote auxiliaries configured for the region will not 243 * be updated. This is basically how a remote listener works when plugged into a local cache. 244 * <p> 245 * If the cluster is configured to keep local cluster consistency, then all listeners will be 246 * updated. This allows cluster server A to update cluster server B and then B to update its 247 * clients if it is told to keep local cluster consistency. Otherwise, server A will update 248 * server B and B will not tell its clients. If you cluster using lateral caches for instance, 249 * this is how it will work. Updates to a cluster node, will never get to the leaves. The remote 250 * cluster, with local cluster consistency, allows you to update leaves. This basically allows 251 * you to have a failover remote server. 252 * <p> 253 * Since currently a cluster will not try to get from other cluster servers, you can scale a bit 254 * with a cluster configuration. Puts and removes will be broadcasted to all clients, but the 255 * get load on a remote server can be reduced. 256 * <p> 257 * @param item 258 * @param requesterId 259 */ 260 private void processUpdate( final ICacheElement<K, V> item, final long requesterId ) 261 { 262 final ElapsedTimer timer = new ElapsedTimer(); 263 logUpdateInfo( item ); 264 265 try 266 { 267 final CacheListeners<K, V> cacheDesc = getCacheListeners( item.getCacheName() ); 268 final boolean fromCluster = isRequestFromCluster( requesterId ); 269 270 log.debug( "In update, requesterId = [{0}] fromCluster = {1}", requesterId, fromCluster ); 271 272 // ordered cache item update and notification. 273 synchronized ( cacheDesc ) 274 { 275 try 276 { 277 final CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache; 278 279 // If the source of this request was not from a cluster, 280 // then consider it a local update. The cache manager will 281 // try to 282 // update all auxiliaries. 283 // 284 // This requires that two local caches not be connected to 285 // two clustered remote caches. The failover runner will 286 // have to make sure of this. ALos, the local cache needs 287 // avoid updating this source. Will need to pass the source 288 // id somehow. The remote cache should update all local 289 // caches 290 // but not update the cluster source. Cluster remote caches 291 // should only be updated by the server and not the 292 // RemoteCache. 293 if ( fromCluster ) 294 { 295 log.debug( "Put FROM cluster, NOT updating other auxiliaries for region. " 296 + " requesterId [{0}]", requesterId ); 297 c.localUpdate( item ); 298 } 299 else 300 { 301 log.debug( "Put NOT from cluster, updating other auxiliaries for region. " 302 + " requesterId [{0}]", requesterId ); 303 c.update( item ); 304 } 305 } 306 catch ( final IOException ce ) 307 { 308 // swallow 309 log.info( "Exception caught updating item. requesterId [{0}]: {1}", 310 requesterId, ce.getMessage() ); 311 } 312 313 // UPDATE LOCALS IF A REQUEST COMES FROM A CLUSTER 314 // IF LOCAL CLUSTER CONSISTENCY IS CONFIGURED 315 if (!fromCluster || fromCluster && remoteCacheServerAttributes.isLocalClusterConsistency()) 316 { 317 final ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId ); 318 log.debug( "qlist.length = {0}", qlist.length ); 319 for (final ICacheEventQueue<K, V> element : qlist) { 320 element.addPutEvent( item ); 321 } 322 } 323 } 324 } 325 catch ( final IOException e ) 326 { 327 if ( cacheEventLogger != null ) 328 { 329 cacheEventLogger.logError( "RemoteCacheServer", ICacheEventLogger.UPDATE_EVENT, e.getMessage() 330 + " REGION: " + item.getCacheName() + " ITEM: " + item ); 331 } 332 333 log.error( "Trouble in Update. requesterId [{0}]", requesterId, e ); 334 } 335 336 // TODO use JAMON for timing 337 log.debug( "put took {0} ms.", timer::getElapsedTime); 338 } 339 340 /** 341 * Log some details. 342 * <p> 343 * @param item 344 */ 345 private void logUpdateInfo( final ICacheElement<K, V> item ) 346 { 347 // not thread safe, but it doesn't have to be 100% accurate 348 puts++; 349 350 if ( log.isInfoEnabled() && (puts % logInterval == 0) ) 351 { 352 log.info( "puts = {0}", puts ); 353 } 354 355 log.debug( "In update, put [{0}] in [{1}]", 356 item::getKey, item::getCacheName); 357 } 358 359 /** 360 * Returns a cache value from the specified remote cache; or null if the cache or key does not 361 * exist. 362 * <p> 363 * @param cacheName 364 * @param key 365 * @return ICacheElement 366 * @throws IOException 367 */ 368 @Override 369 public ICacheElement<K, V> get( final String cacheName, final K key ) 370 throws IOException 371 { 372 return this.get( cacheName, key, 0 ); 373 } 374 375 /** 376 * Returns a cache bean from the specified cache; or null if the key does not exist. 377 * <p> 378 * Adding the requestor id, allows the cache to determine the source of the get. 379 * <p> 380 * The internal processing is wrapped in event logging calls. 381 * <p> 382 * @param cacheName 383 * @param key 384 * @param requesterId 385 * @return ICacheElement 386 * @throws IOException 387 */ 388 @Override 389 public ICacheElement<K, V> get( final String cacheName, final K key, final long requesterId ) 390 throws IOException 391 { 392 ICacheElement<K, V> element = null; 393 final ICacheEvent<K> cacheEvent = createICacheEvent( cacheName, key, requesterId, ICacheEventLogger.GET_EVENT ); 394 try 395 { 396 element = processGet( cacheName, key, requesterId ); 397 } 398 finally 399 { 400 logICacheEvent( cacheEvent ); 401 } 402 return element; 403 } 404 405 /** 406 * Returns a cache bean from the specified cache; or null if the key does not exist. 407 * <p> 408 * Adding the requester id, allows the cache to determine the source of the get. 409 * <p> 410 * @param cacheName 411 * @param key 412 * @param requesterId 413 * @return ICacheElement 414 */ 415 private ICacheElement<K, V> processGet( final String cacheName, final K key, final long requesterId ) 416 { 417 final boolean fromCluster = isRequestFromCluster( requesterId ); 418 419 log.debug( "get [{0}] from cache [{1}] requesterId = [{2}] fromCluster = {3}", 420 key, cacheName, requesterId, fromCluster ); 421 422 final CacheListeners<K, V> cacheDesc = getCacheListeners( cacheName ); 423 424 return getFromCacheListeners( key, fromCluster, cacheDesc, null ); 425 } 426 427 /** 428 * Gets the item from the associated cache listeners. 429 * <p> 430 * @param key 431 * @param fromCluster 432 * @param cacheDesc 433 * @param element 434 * @return ICacheElement 435 */ 436 private ICacheElement<K, V> getFromCacheListeners( final K key, final boolean fromCluster, final CacheListeners<K, V> cacheDesc, 437 final ICacheElement<K, V> element ) 438 { 439 ICacheElement<K, V> returnElement = element; 440 441 if ( cacheDesc != null ) 442 { 443 final CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache; 444 445 // If we have a get come in from a client and we don't have the item 446 // locally, we will allow the cache to look in other non local sources, 447 // such as a remote cache or a lateral. 448 // 449 // Since remote servers never get from clients and clients never go 450 // remote from a remote call, this 451 // will not result in any loops. 452 // 453 // This is the only instance I can think of where we allow a remote get 454 // from a remote call. The purpose is to allow remote cache servers to 455 // talk to each other. If one goes down, you want it to be able to get 456 // data from those that were up when the failed server comes back o 457 // line. 458 459 if ( !fromCluster && this.remoteCacheServerAttributes.isAllowClusterGet() ) 460 { 461 log.debug( "NonLocalGet. fromCluster [{0}] AllowClusterGet [{1}]", 462 fromCluster, this.remoteCacheServerAttributes.isAllowClusterGet() ); 463 returnElement = c.get( key ); 464 } 465 else 466 { 467 // Gets from cluster type remote will end up here. 468 // Gets from all clients will end up here if allow cluster get is 469 // false. 470 log.debug( "LocalGet. fromCluster [{0}] AllowClusterGet [{1}]", 471 fromCluster, this.remoteCacheServerAttributes.isAllowClusterGet() ); 472 returnElement = c.localGet( key ); 473 } 474 } 475 476 return returnElement; 477 } 478 479 /** 480 * Gets all matching items. 481 * <p> 482 * @param cacheName 483 * @param pattern 484 * @return Map of keys and wrapped objects 485 * @throws IOException 486 */ 487 @Override 488 public Map<K, ICacheElement<K, V>> getMatching( final String cacheName, final String pattern ) 489 throws IOException 490 { 491 return getMatching( cacheName, pattern, 0 ); 492 } 493 494 /** 495 * Retrieves all matching keys. 496 * <p> 497 * @param cacheName 498 * @param pattern 499 * @param requesterId 500 * @return Map of keys and wrapped objects 501 * @throws IOException 502 */ 503 @Override 504 public Map<K, ICacheElement<K, V>> getMatching( final String cacheName, final String pattern, final long requesterId ) 505 throws IOException 506 { 507 final ICacheEvent<String> cacheEvent = createICacheEvent( cacheName, pattern, requesterId, 508 ICacheEventLogger.GETMATCHING_EVENT ); 509 try 510 { 511 return processGetMatching( cacheName, pattern, requesterId ); 512 } 513 finally 514 { 515 logICacheEvent( cacheEvent ); 516 } 517 } 518 519 /** 520 * Retrieves all matching keys. 521 * <p> 522 * @param cacheName 523 * @param pattern 524 * @param requesterId 525 * @return Map of keys and wrapped objects 526 */ 527 protected Map<K, ICacheElement<K, V>> processGetMatching( final String cacheName, final String pattern, final long requesterId ) 528 { 529 final boolean fromCluster = isRequestFromCluster( requesterId ); 530 531 log.debug( "getMatching [{0}] from cache [{1}] requesterId = [{2}] fromCluster = {3}", 532 pattern, cacheName, requesterId, fromCluster ); 533 534 CacheListeners<K, V> cacheDesc = null; 535 try 536 { 537 cacheDesc = getCacheListeners( cacheName ); 538 } 539 catch ( final Exception e ) 540 { 541 log.error( "Problem getting listeners.", e ); 542 543 if ( cacheEventLogger != null ) 544 { 545 cacheEventLogger.logError( "RemoteCacheServer", ICacheEventLogger.GETMATCHING_EVENT, e.getMessage() 546 + cacheName + " pattern: " + pattern ); 547 } 548 } 549 550 return getMatchingFromCacheListeners( pattern, fromCluster, cacheDesc ); 551 } 552 553 /** 554 * Gets the item from the associated cache listeners. 555 * <p> 556 * @param pattern 557 * @param fromCluster 558 * @param cacheDesc 559 * @return Map of keys to results 560 */ 561 private Map<K, ICacheElement<K, V>> getMatchingFromCacheListeners( final String pattern, final boolean fromCluster, final CacheListeners<K, V> cacheDesc ) 562 { 563 Map<K, ICacheElement<K, V>> elements = null; 564 if ( cacheDesc != null ) 565 { 566 final CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache; 567 568 // We always want to go remote and then merge the items. But this can lead to inconsistencies after 569 // failover recovery. Removed items may show up. There is no good way to prevent this. 570 // We should make it configurable. 571 572 if ( !fromCluster && this.remoteCacheServerAttributes.isAllowClusterGet() ) 573 { 574 log.debug( "NonLocalGetMatching. fromCluster [{0}] AllowClusterGet [{1}]", 575 fromCluster, this.remoteCacheServerAttributes.isAllowClusterGet() ); 576 elements = c.getMatching( pattern ); 577 } 578 else 579 { 580 // Gets from cluster type remote will end up here. 581 // Gets from all clients will end up here if allow cluster get is 582 // false. 583 584 log.debug( "LocalGetMatching. fromCluster [{0}] AllowClusterGet [{1}]", 585 fromCluster, this.remoteCacheServerAttributes.isAllowClusterGet() ); 586 elements = c.localGetMatching( pattern ); 587 } 588 } 589 return elements; 590 } 591 592 /** 593 * Gets multiple items from the cache based on the given set of keys. 594 * <p> 595 * @param cacheName 596 * @param keys 597 * @return a map of K key to ICacheElement<K, V> element, or an empty map if there is no 598 * data in cache for any of these keys 599 * @throws IOException 600 */ 601 @Override 602 public Map<K, ICacheElement<K, V>> getMultiple( final String cacheName, final Set<K> keys ) 603 throws IOException 604 { 605 return this.getMultiple( cacheName, keys, 0 ); 606 } 607 608 /** 609 * Gets multiple items from the cache based on the given set of keys. 610 * <p> 611 * The internal processing is wrapped in event logging calls. 612 * <p> 613 * @param cacheName 614 * @param keys 615 * @param requesterId 616 * @return a map of K key to ICacheElement<K, V> element, or an empty map if there is no 617 * data in cache for any of these keys 618 * @throws IOException 619 */ 620 @Override 621 public Map<K, ICacheElement<K, V>> getMultiple( final String cacheName, final Set<K> keys, final long requesterId ) 622 throws IOException 623 { 624 final ICacheEvent<Serializable> cacheEvent = createICacheEvent( cacheName, (Serializable) keys, requesterId, 625 ICacheEventLogger.GETMULTIPLE_EVENT ); 626 try 627 { 628 return processGetMultiple( cacheName, keys, requesterId ); 629 } 630 finally 631 { 632 logICacheEvent( cacheEvent ); 633 } 634 } 635 636 /** 637 * Gets multiple items from the cache based on the given set of keys. 638 * <p> 639 * @param cacheName 640 * @param keys 641 * @param requesterId 642 * @return a map of K key to ICacheElement<K, V> element, or an empty map if there is no 643 * data in cache for any of these keys 644 */ 645 private Map<K, ICacheElement<K, V>> processGetMultiple( final String cacheName, final Set<K> keys, final long requesterId ) 646 { 647 final boolean fromCluster = isRequestFromCluster( requesterId ); 648 649 log.debug( "getMultiple [{0}] from cache [{1}] requesterId = [{2}] fromCluster = {3}", 650 keys, cacheName, requesterId, fromCluster ); 651 652 final CacheListeners<K, V> cacheDesc = getCacheListeners( cacheName ); 653 return getMultipleFromCacheListeners( keys, null, fromCluster, cacheDesc ); 654 } 655 656 /** 657 * Since a non-receiving remote cache client will not register a listener, it will not have a 658 * listener id assigned from the server. As such the remote server cannot determine if it is a 659 * cluster or a normal client. It will assume that it is a normal client. 660 * <p> 661 * @param requesterId 662 * @return true is from a cluster. 663 */ 664 private boolean isRequestFromCluster( final long requesterId ) 665 { 666 final RemoteType remoteTypeL = idTypeMap.get( Long.valueOf( requesterId ) ); 667 return remoteTypeL == RemoteType.CLUSTER; 668 } 669 670 /** 671 * Gets the items from the associated cache listeners. 672 * <p> 673 * @param keys 674 * @param elements 675 * @param fromCluster 676 * @param cacheDesc 677 * @return Map 678 */ 679 private Map<K, ICacheElement<K, V>> getMultipleFromCacheListeners( final Set<K> keys, final Map<K, ICacheElement<K, V>> elements, final boolean fromCluster, final CacheListeners<K, V> cacheDesc ) 680 { 681 Map<K, ICacheElement<K, V>> returnElements = elements; 682 683 if ( cacheDesc != null ) 684 { 685 final CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache; 686 687 // If we have a getMultiple come in from a client and we don't have the item 688 // locally, we will allow the cache to look in other non local sources, 689 // such as a remote cache or a lateral. 690 // 691 // Since remote servers never get from clients and clients never go 692 // remote from a remote call, this 693 // will not result in any loops. 694 // 695 // This is the only instance I can think of where we allow a remote get 696 // from a remote call. The purpose is to allow remote cache servers to 697 // talk to each other. If one goes down, you want it to be able to get 698 // data from those that were up when the failed server comes back on 699 // line. 700 701 if ( !fromCluster && this.remoteCacheServerAttributes.isAllowClusterGet() ) 702 { 703 log.debug( "NonLocalGetMultiple. fromCluster [{0}] AllowClusterGet [{1}]", 704 fromCluster, this.remoteCacheServerAttributes.isAllowClusterGet() ); 705 706 returnElements = c.getMultiple( keys ); 707 } 708 else 709 { 710 // Gets from cluster type remote will end up here. 711 // Gets from all clients will end up here if allow cluster get is 712 // false. 713 714 log.debug( "LocalGetMultiple. fromCluster [{0}] AllowClusterGet [{1}]", 715 fromCluster, this.remoteCacheServerAttributes.isAllowClusterGet() ); 716 717 returnElements = c.localGetMultiple( keys ); 718 } 719 } 720 721 return returnElements; 722 } 723 724 /** 725 * Return the keys in the cache. 726 * <p> 727 * @param cacheName the name of the cache region 728 * @see org.apache.commons.jcs3.auxiliary.AuxiliaryCache#getKeySet() 729 */ 730 @Override 731 public Set<K> getKeySet(final String cacheName) throws IOException 732 { 733 return processGetKeySet( cacheName ); 734 } 735 736 /** 737 * Gets the set of keys of objects currently in the cache. 738 * <p> 739 * @param cacheName 740 * @return Set 741 */ 742 protected Set<K> processGetKeySet( final String cacheName ) 743 { 744 final CacheListeners<K, V> cacheDesc = getCacheListeners( cacheName ); 745 746 if ( cacheDesc == null ) 747 { 748 return Collections.emptySet(); 749 } 750 751 final CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache; 752 return c.getKeySet(); 753 } 754 755 /** 756 * Removes the given key from the specified remote cache. Defaults the listener id to 0. 757 * <p> 758 * @param cacheName 759 * @param key 760 * @throws IOException 761 */ 762 @Override 763 public void remove( final String cacheName, final K key ) 764 throws IOException 765 { 766 remove( cacheName, key, 0 ); 767 } 768 769 /** 770 * Remove the key from the cache region and don't tell the source listener about it. 771 * <p> 772 * The internal processing is wrapped in event logging calls. 773 * <p> 774 * @param cacheName 775 * @param key 776 * @param requesterId 777 * @throws IOException 778 */ 779 @Override 780 public void remove( final String cacheName, final K key, final long requesterId ) 781 throws IOException 782 { 783 final ICacheEvent<K> cacheEvent = createICacheEvent( cacheName, key, requesterId, ICacheEventLogger.REMOVE_EVENT ); 784 try 785 { 786 processRemove( cacheName, key, requesterId ); 787 } 788 finally 789 { 790 logICacheEvent( cacheEvent ); 791 } 792 } 793 794 /** 795 * Remove the key from the cache region and don't tell the source listener about it. 796 * <p> 797 * @param cacheName 798 * @param key 799 * @param requesterId 800 * @throws IOException 801 */ 802 private void processRemove( final String cacheName, final K key, final long requesterId ) 803 throws IOException 804 { 805 log.debug( "remove [{0}] from cache [{1}]", key, cacheName ); 806 807 final CacheListeners<K, V> cacheDesc = cacheListenersMap.get( cacheName ); 808 809 final boolean fromCluster = isRequestFromCluster( requesterId ); 810 811 if ( cacheDesc != null ) 812 { 813 // best attempt to achieve ordered cache item removal and 814 // notification. 815 synchronized ( cacheDesc ) 816 { 817 boolean removeSuccess = false; 818 819 // No need to notify if it was not cached. 820 final CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache; 821 822 if ( fromCluster ) 823 { 824 log.debug( "Remove FROM cluster, NOT updating other auxiliaries for region" ); 825 removeSuccess = c.localRemove( key ); 826 } 827 else 828 { 829 log.debug( "Remove NOT from cluster, updating other auxiliaries for region" ); 830 removeSuccess = c.remove( key ); 831 } 832 833 log.debug( "remove [{0}] from cache [{1}] success (was it found) = {2}", 834 key, cacheName, removeSuccess ); 835 836 // UPDATE LOCALS IF A REQUEST COMES FROM A CLUSTER 837 // IF LOCAL CLUSTER CONSISTENCY IS CONFIGURED 838 if (!fromCluster || fromCluster && remoteCacheServerAttributes.isLocalClusterConsistency()) 839 { 840 final ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId ); 841 842 for (final ICacheEventQueue<K, V> element : qlist) { 843 element.addRemoveEvent( key ); 844 } 845 } 846 } 847 } 848 } 849 850 /** 851 * Remove all keys from the specified remote cache. 852 * <p> 853 * @param cacheName 854 * @throws IOException 855 */ 856 @Override 857 public void removeAll( final String cacheName ) 858 throws IOException 859 { 860 removeAll( cacheName, 0 ); 861 } 862 863 /** 864 * Remove all keys from the specified remote cache. 865 * <p> 866 * The internal processing is wrapped in event logging calls. 867 * <p> 868 * @param cacheName 869 * @param requesterId 870 * @throws IOException 871 */ 872 @Override 873 public void removeAll( final String cacheName, final long requesterId ) 874 throws IOException 875 { 876 final ICacheEvent<String> cacheEvent = createICacheEvent( cacheName, "all", requesterId, ICacheEventLogger.REMOVEALL_EVENT ); 877 try 878 { 879 processRemoveAll( cacheName, requesterId ); 880 } 881 finally 882 { 883 logICacheEvent( cacheEvent ); 884 } 885 } 886 887 /** 888 * Remove all keys from the specified remote cache. 889 * <p> 890 * @param cacheName 891 * @param requesterId 892 * @throws IOException 893 */ 894 private void processRemoveAll( final String cacheName, final long requesterId ) 895 throws IOException 896 { 897 final CacheListeners<K, V> cacheDesc = cacheListenersMap.get( cacheName ); 898 899 final boolean fromCluster = isRequestFromCluster( requesterId ); 900 901 if ( cacheDesc != null ) 902 { 903 // best attempt to achieve ordered cache item removal and 904 // notification. 905 synchronized ( cacheDesc ) 906 { 907 // No need to broadcast, or notify if it was not cached. 908 final CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache; 909 910 if ( fromCluster ) 911 { 912 log.debug( "RemoveALL FROM cluster, NOT updating other auxiliaries for region" ); 913 c.localRemoveAll(); 914 } 915 else 916 { 917 log.debug( "RemoveALL NOT from cluster, updating other auxiliaries for region" ); 918 c.removeAll(); 919 } 920 921 // update registered listeners 922 if (!fromCluster || fromCluster && remoteCacheServerAttributes.isLocalClusterConsistency()) 923 { 924 final ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId ); 925 926 for (final ICacheEventQueue<K, V> q : qlist) 927 { 928 q.addRemoveAllEvent(); 929 } 930 } 931 } 932 } 933 } 934 935 /** 936 * How many put events have we received. 937 * <p> 938 * @return puts 939 */ 940 // Currently only intended for use by unit tests 941 int getPutCount() 942 { 943 return puts; 944 } 945 946 /** 947 * Frees the specified remote cache. 948 * <p> 949 * @param cacheName 950 * @throws IOException 951 */ 952 @Override 953 public void dispose( final String cacheName ) 954 throws IOException 955 { 956 dispose( cacheName, 0 ); 957 } 958 959 /** 960 * Frees the specified remote cache. 961 * <p> 962 * @param cacheName 963 * @param requesterId 964 * @throws IOException 965 */ 966 public void dispose( final String cacheName, final long requesterId ) 967 throws IOException 968 { 969 final ICacheEvent<String> cacheEvent = createICacheEvent( cacheName, "none", requesterId, ICacheEventLogger.DISPOSE_EVENT ); 970 try 971 { 972 processDispose( cacheName, requesterId ); 973 } 974 finally 975 { 976 logICacheEvent( cacheEvent ); 977 } 978 } 979 980 /** 981 * @param cacheName 982 * @param requesterId 983 * @throws IOException 984 */ 985 private void processDispose( final String cacheName, final long requesterId ) 986 throws IOException 987 { 988 log.info( "Dispose request received from listener [{0}]", requesterId ); 989 990 final CacheListeners<K, V> cacheDesc = cacheListenersMap.get( cacheName ); 991 992 // this is dangerous 993 if ( cacheDesc != null ) 994 { 995 // best attempt to achieve ordered free-cache-op and notification. 996 synchronized ( cacheDesc ) 997 { 998 final ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId ); 999 1000 for (final ICacheEventQueue<K, V> element : qlist) { 1001 element.addDisposeEvent(); 1002 } 1003 cacheManager.freeCache( cacheName ); 1004 } 1005 } 1006 } 1007 1008 /** 1009 * Frees all remote caches. 1010 * <p> 1011 * @throws IOException 1012 */ 1013 @Override 1014 public void release() 1015 throws IOException 1016 { 1017 for (final CacheListeners<K, V> cacheDesc : cacheListenersMap.values()) 1018 { 1019 final ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, 0 ); 1020 1021 for (final ICacheEventQueue<K, V> element : qlist) { 1022 element.addDisposeEvent(); 1023 } 1024 } 1025 cacheManager.release(); 1026 } 1027 1028 /** 1029 * Returns the cache listener for the specified cache. Creates the cache and the cache 1030 * descriptor if they do not already exist. 1031 * <p> 1032 * @param cacheName 1033 * @return The cacheListeners value 1034 */ 1035 protected CacheListeners<K, V> getCacheListeners( final String cacheName ) 1036 { 1037 1038 return cacheListenersMap.computeIfAbsent(cacheName, key -> { 1039 final CompositeCache<K, V> cache = cacheManager.getCache(key); 1040 return new CacheListeners<>( cache ); 1041 }); 1042 } 1043 1044 /** 1045 * Gets the clusterListeners attribute of the RemoteCacheServer object. 1046 * <p> 1047 * TODO may be able to remove this 1048 * @param cacheName 1049 * @return The clusterListeners value 1050 */ 1051 protected CacheListeners<K, V> getClusterListeners( final String cacheName ) 1052 { 1053 1054 return clusterListenersMap.computeIfAbsent(cacheName, key -> { 1055 final CompositeCache<K, V> cache = cacheManager.getCache( cacheName ); 1056 return new CacheListeners<>( cache ); 1057 }); 1058 } 1059 1060 /** 1061 * Gets the eventQList attribute of the RemoteCacheServer object. This returns the event queues 1062 * stored in the cacheListeners object for a particular region, if the queue is not for this 1063 * requester. 1064 * <p> 1065 * Basically, this makes sure that a request from a particular local cache, identified by its 1066 * listener id, does not result in a call to that same listener. 1067 * <p> 1068 * @param cacheListeners 1069 * @param requesterId 1070 * @return The eventQList value 1071 */ 1072 @SuppressWarnings("unchecked") // No generic arrays in java 1073 private ICacheEventQueue<K, V>[] getEventQList( final CacheListeners<K, V> cacheListeners, final long requesterId ) 1074 { 1075 final ICacheEventQueue<K, V>[] list = cacheListeners.eventQMap.values().toArray( new ICacheEventQueue[0] ); 1076 int count = 0; 1077 // Set those not qualified to null; Count those qualified. 1078 for ( int i = 0; i < list.length; i++ ) 1079 { 1080 final ICacheEventQueue<K, V> q = list[i]; 1081 if ( q.isWorking() && q.getListenerId() != requesterId ) 1082 { 1083 count++; 1084 } 1085 else 1086 { 1087 list[i] = null; 1088 } 1089 } 1090 if ( count == list.length ) 1091 { 1092 // All qualified. 1093 return list; 1094 } 1095 1096 // Returns only the qualified. 1097 final ICacheEventQueue<K, V>[] qq = new ICacheEventQueue[count]; 1098 count = 0; 1099 for (final ICacheEventQueue<K, V> element : list) { 1100 if ( element != null ) 1101 { 1102 qq[count++] = element; 1103 } 1104 } 1105 return qq; 1106 } 1107 1108 /** 1109 * Removes dead event queues. Should clean out deregistered listeners. 1110 * <p> 1111 * @param eventQMap 1112 */ 1113 private static <KK, VV> void cleanupEventQMap( final Map<Long, ICacheEventQueue<KK, VV>> eventQMap ) 1114 { 1115 // this does not care if the q is alive (i.e. if 1116 // there are active threads; it cares if the queue 1117 // is working -- if it has not encountered errors 1118 // above the failure threshold 1119 eventQMap.entrySet().removeIf(e -> !e.getValue().isWorking()); 1120 } 1121 1122 /** 1123 * Subscribes to the specified remote cache. 1124 * <p> 1125 * If the client id is 0, then the remote cache server will increment it's local count and 1126 * assign an id to the client. 1127 * <p> 1128 * @param cacheName the specified remote cache. 1129 * @param listener object to notify for cache changes. must be synchronized since there are 1130 * remote calls involved. 1131 * @throws IOException 1132 */ 1133 @Override 1134 @SuppressWarnings("unchecked") // Need to cast to specific return type from getClusterListeners() 1135 public <KK, VV> void addCacheListener( final String cacheName, final ICacheListener<KK, VV> listener ) 1136 throws IOException 1137 { 1138 if ( cacheName == null || listener == null ) 1139 { 1140 throw new IllegalArgumentException( "cacheName and listener must not be null" ); 1141 } 1142 final CacheListeners<KK, VV> cacheListeners; 1143 1144 final IRemoteCacheListener<KK, VV> ircl = (IRemoteCacheListener<KK, VV>) listener; 1145 1146 final String listenerAddress = ircl.getLocalHostAddress(); 1147 1148 final RemoteType remoteType = ircl.getRemoteType(); 1149 if ( remoteType == RemoteType.CLUSTER ) 1150 { 1151 log.debug( "adding cluster listener, listenerAddress [{0}]", listenerAddress ); 1152 cacheListeners = (CacheListeners<KK, VV>)getClusterListeners( cacheName ); 1153 } 1154 else 1155 { 1156 log.debug( "adding normal listener, listenerAddress [{0}]", listenerAddress ); 1157 cacheListeners = (CacheListeners<KK, VV>)getCacheListeners( cacheName ); 1158 } 1159 final Map<Long, ICacheEventQueue<KK, VV>> eventQMap = cacheListeners.eventQMap; 1160 cleanupEventQMap( eventQMap ); 1161 1162 // synchronized ( listenerId ) 1163 synchronized ( ICacheListener.class ) 1164 { 1165 long id = 0; 1166 try 1167 { 1168 id = listener.getListenerId(); 1169 // clients probably shouldn't do this. 1170 if ( id == 0 ) 1171 { 1172 // must start at one so the next gets recognized 1173 final long listenerIdB = nextListenerId(); 1174 log.debug( "listener id={0} addded for cache [{1}], listenerAddress [{2}]", 1175 listenerIdB & 0xff, cacheName, listenerAddress ); 1176 listener.setListenerId( listenerIdB ); 1177 id = listenerIdB; 1178 1179 // in case it needs synchronization 1180 final String message = "Adding vm listener under new id = [" + listenerIdB + "], listenerAddress [" 1181 + listenerAddress + "]"; 1182 logApplicationEvent( "RemoteCacheServer", "addCacheListener", message ); 1183 log.info( message ); 1184 } 1185 else 1186 { 1187 final String message = "Adding listener under existing id = [" + id + "], listenerAddress [" 1188 + listenerAddress + "]"; 1189 logApplicationEvent( "RemoteCacheServer", "addCacheListener", message ); 1190 log.info( message ); 1191 // should confirm the host is the same as we have on 1192 // record, just in case a client has made a mistake. 1193 } 1194 1195 // relate the type to an id 1196 this.idTypeMap.put( Long.valueOf( id ), remoteType); 1197 if ( listenerAddress != null ) 1198 { 1199 this.idIPMap.put( Long.valueOf( id ), listenerAddress ); 1200 } 1201 } 1202 catch ( final IOException ioe ) 1203 { 1204 final String message = "Problem setting listener id, listenerAddress [" + listenerAddress + "]"; 1205 log.error( message, ioe ); 1206 1207 if ( cacheEventLogger != null ) 1208 { 1209 cacheEventLogger.logError( "RemoteCacheServer", "addCacheListener", message + " - " 1210 + ioe.getMessage() ); 1211 } 1212 } 1213 1214 final CacheEventQueueFactory<KK, VV> fact = new CacheEventQueueFactory<>(); 1215 final ICacheEventQueue<KK, VV> q = fact.createCacheEventQueue( listener, id, cacheName, remoteCacheServerAttributes 1216 .getEventQueuePoolName(), remoteCacheServerAttributes.getEventQueueType() ); 1217 1218 eventQMap.put(Long.valueOf(listener.getListenerId()), q); 1219 1220 log.info( cacheListeners ); 1221 } 1222 } 1223 1224 /** 1225 * Subscribes to all remote caches. 1226 * <p> 1227 * @param listener The feature to be added to the CacheListener attribute 1228 * @throws IOException 1229 */ 1230 @Override 1231 public <KK, VV> void addCacheListener( final ICacheListener<KK, VV> listener ) 1232 throws IOException 1233 { 1234 for (final String cacheName : cacheListenersMap.keySet()) 1235 { 1236 addCacheListener( cacheName, listener ); 1237 1238 log.debug( "Adding listener for cache [{0}]", cacheName ); 1239 } 1240 } 1241 1242 /** 1243 * Unsubscribe this listener from this region. If the listener is registered, it will be removed 1244 * from the event queue map list. 1245 * <p> 1246 * @param cacheName 1247 * @param listener 1248 * @throws IOException 1249 */ 1250 @Override 1251 public <KK, VV> void removeCacheListener( final String cacheName, final ICacheListener<KK, VV> listener ) 1252 throws IOException 1253 { 1254 removeCacheListener( cacheName, listener.getListenerId() ); 1255 } 1256 1257 /** 1258 * Unsubscribe this listener from this region. If the listener is registered, it will be removed 1259 * from the event queue map list. 1260 * <p> 1261 * @param cacheName 1262 * @param listenerId 1263 */ 1264 public void removeCacheListener( final String cacheName, final long listenerId ) 1265 { 1266 final String message = "Removing listener for cache region = [" + cacheName + "] and listenerId [" + listenerId + "]"; 1267 logApplicationEvent( "RemoteCacheServer", "removeCacheListener", message ); 1268 log.info( message ); 1269 1270 final boolean isClusterListener = isRequestFromCluster( listenerId ); 1271 1272 CacheListeners<K, V> cacheDesc = null; 1273 1274 if ( isClusterListener ) 1275 { 1276 cacheDesc = getClusterListeners( cacheName ); 1277 } 1278 else 1279 { 1280 cacheDesc = getCacheListeners( cacheName ); 1281 } 1282 final Map<Long, ICacheEventQueue<K, V>> eventQMap = cacheDesc.eventQMap; 1283 cleanupEventQMap( eventQMap ); 1284 final ICacheEventQueue<K, V> q = eventQMap.remove( Long.valueOf( listenerId ) ); 1285 1286 if ( q != null ) 1287 { 1288 log.debug( "Found queue for cache region = [{0}] and listenerId [{1}]", 1289 cacheName, listenerId ); 1290 q.destroy(); 1291 cleanupEventQMap( eventQMap ); 1292 } 1293 else 1294 { 1295 log.debug( "Did not find queue for cache region = [{0}] and listenerId [{1}]", 1296 cacheName, listenerId ); 1297 } 1298 1299 // cleanup 1300 idTypeMap.remove( Long.valueOf( listenerId ) ); 1301 idIPMap.remove( Long.valueOf( listenerId ) ); 1302 1303 log.info( "After removing listener [{0}] cache region {1} listener size [{2}]", 1304 listenerId, cacheName, eventQMap.size() ); 1305 } 1306 1307 /** 1308 * Unsubscribes from all remote caches. 1309 * <p> 1310 * @param listener 1311 * @throws IOException 1312 */ 1313 @Override 1314 public <KK, VV> void removeCacheListener( final ICacheListener<KK, VV> listener ) 1315 throws IOException 1316 { 1317 for (final String cacheName : cacheListenersMap.keySet()) 1318 { 1319 removeCacheListener( cacheName, listener ); 1320 1321 log.info( "Removing listener for cache [{0}]", cacheName ); 1322 } 1323 } 1324 1325 /** 1326 * Shuts down the remote server. 1327 * <p> 1328 * @throws IOException 1329 */ 1330 @Override 1331 public void shutdown() 1332 throws IOException 1333 { 1334 shutdown("", Registry.REGISTRY_PORT); 1335 } 1336 1337 /** 1338 * Shuts down a server at a particular host and port. Then it calls shutdown on the cache 1339 * itself. 1340 * <p> 1341 * @param host 1342 * @param port 1343 * @throws IOException 1344 */ 1345 @Override 1346 public void shutdown( final String host, final int port ) 1347 throws IOException 1348 { 1349 log.info( "Received shutdown request. Shutting down server." ); 1350 1351 synchronized (listenerId) 1352 { 1353 for (final String cacheName : cacheListenersMap.keySet()) 1354 { 1355 for (int i = 0; i <= listenerId[0]; i++) 1356 { 1357 removeCacheListener( cacheName, i ); 1358 } 1359 1360 log.info( "Removing listener for cache [{0}]", cacheName ); 1361 } 1362 1363 cacheListenersMap.clear(); 1364 clusterListenersMap.clear(); 1365 } 1366 RemoteCacheServerFactory.shutdownImpl( host, port ); 1367 this.cacheManager.shutDown(); 1368 } 1369 1370 /** 1371 * Called by the RMI runtime sometime after the runtime determines that the reference list, the 1372 * list of clients referencing the remote object, becomes empty. 1373 */ 1374 // TODO: test out the DGC. 1375 @Override 1376 public void unreferenced() 1377 { 1378 log.info( "*** Server now unreferenced and subject to GC. ***" ); 1379 } 1380 1381 /** 1382 * Returns the next generated listener id [0,255]. 1383 * <p> 1384 * @return the listener id of a client. This should be unique for this server. 1385 */ 1386 private long nextListenerId() 1387 { 1388 long id = 0; 1389 if ( listenerId[0] == Integer.MAX_VALUE ) 1390 { 1391 synchronized ( listenerId ) 1392 { 1393 id = listenerId[0]; 1394 listenerId[0] = 0; 1395 // TODO: record & check if the generated id is currently being 1396 // used by a valid listener. Currently if the id wraps after 1397 // Long.MAX_VALUE, 1398 // we just assume it won't collide with an existing listener who 1399 // is live. 1400 } 1401 } 1402 else 1403 { 1404 synchronized ( listenerId ) 1405 { 1406 id = ++listenerId[0]; 1407 } 1408 } 1409 return id; 1410 } 1411 1412 /** 1413 * Gets the stats attribute of the RemoteCacheServer object. 1414 * <p> 1415 * @return The stats value 1416 * @throws IOException 1417 */ 1418 @Override 1419 public String getStats() 1420 throws IOException 1421 { 1422 return cacheManager.getStats(); 1423 } 1424 1425 /** 1426 * Logs an event if an event logger is configured. 1427 * <p> 1428 * @param item 1429 * @param requesterId 1430 * @param eventName 1431 * @return ICacheEvent 1432 */ 1433 private ICacheEvent<ICacheElement<K, V>> createICacheEvent( final ICacheElement<K, V> item, final long requesterId, final String eventName ) 1434 { 1435 if ( cacheEventLogger == null ) 1436 { 1437 return new CacheEvent<>(); 1438 } 1439 final String ipAddress = getExtraInfoForRequesterId( requesterId ); 1440 return cacheEventLogger 1441 .createICacheEvent( "RemoteCacheServer", item.getCacheName(), eventName, ipAddress, item ); 1442 } 1443 1444 /** 1445 * Logs an event if an event logger is configured. 1446 * <p> 1447 * @param cacheName 1448 * @param key 1449 * @param requesterId 1450 * @param eventName 1451 * @return ICacheEvent 1452 */ 1453 private <T> ICacheEvent<T> createICacheEvent( final String cacheName, final T key, final long requesterId, final String eventName ) 1454 { 1455 if ( cacheEventLogger == null ) 1456 { 1457 return new CacheEvent<>(); 1458 } 1459 final String ipAddress = getExtraInfoForRequesterId( requesterId ); 1460 return cacheEventLogger.createICacheEvent( "RemoteCacheServer", cacheName, eventName, ipAddress, key ); 1461 } 1462 1463 /** 1464 * Logs an event if an event logger is configured. 1465 * <p> 1466 * @param source 1467 * @param eventName 1468 * @param optionalDetails 1469 */ 1470 protected void logApplicationEvent( final String source, final String eventName, final String optionalDetails ) 1471 { 1472 if ( cacheEventLogger != null ) 1473 { 1474 cacheEventLogger.logApplicationEvent( source, eventName, optionalDetails ); 1475 } 1476 } 1477 1478 /** 1479 * Logs an event if an event logger is configured. 1480 * <p> 1481 * @param cacheEvent 1482 */ 1483 protected <T> void logICacheEvent( final ICacheEvent<T> cacheEvent ) 1484 { 1485 if ( cacheEventLogger != null ) 1486 { 1487 cacheEventLogger.logICacheEvent( cacheEvent ); 1488 } 1489 } 1490 1491 /** 1492 * Ip address for the client, if one is stored. 1493 * <p> 1494 * Protected for testing. 1495 * <p> 1496 * @param requesterId 1497 * @return String 1498 */ 1499 protected String getExtraInfoForRequesterId( final long requesterId ) 1500 { 1501 return idIPMap.get( Long.valueOf( requesterId ) ); 1502 } 1503 1504 /** 1505 * Allows it to be injected. 1506 * <p> 1507 * @param cacheEventLogger 1508 */ 1509 public void setCacheEventLogger( final ICacheEventLogger cacheEventLogger ) 1510 { 1511 this.cacheEventLogger = cacheEventLogger; 1512 } 1513}