Index: trunk/ehcache/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/store/nonstop/LocalReadsAndExceptionOnWritesTimeoutStore.java =================================================================== diff -u -N -r8351 -r8447 --- trunk/ehcache/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/store/nonstop/LocalReadsAndExceptionOnWritesTimeoutStore.java (.../LocalReadsAndExceptionOnWritesTimeoutStore.java) (revision 8351) +++ trunk/ehcache/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/store/nonstop/LocalReadsAndExceptionOnWritesTimeoutStore.java (.../LocalReadsAndExceptionOnWritesTimeoutStore.java) (revision 8447) @@ -330,4 +330,9 @@ public WriteBehind createWriteBehind() { throw new UnsupportedOperationException(); } + + @Override + public void notifyCacheEventListenersChanged() { + writer.notifyCacheEventListenersChanged(); + } } Index: trunk/ehcache/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/store/nonstop/NonStopStoreWrapper.java =================================================================== diff -u -N -r8351 -r8447 --- trunk/ehcache/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/store/nonstop/NonStopStoreWrapper.java (.../NonStopStoreWrapper.java) (revision 8351) +++ trunk/ehcache/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/store/nonstop/NonStopStoreWrapper.java (.../NonStopStoreWrapper.java) (revision 8447) @@ -1872,4 +1872,29 @@ } } + /** + * {@inheritDoc} + */ + @Override + public void notifyCacheEventListenersChanged() { + // THIS IS GENERATED CODE -- DO NOT HAND MODIFY! + // public abstract net.sf.ehcache.Status net.sf.ehcache.store.Store.getStatus() + nonStop.start(toolkitNonStopConfiguration); + try { + throwNonStopExceptionWhenClusterNotInit(); + this.delegate.notifyCacheEventListenersChanged(); + nonstopObserver.end(NonStopOperationOutcomes.SUCCESS); + } catch (NonStopToolkitInstantiationException e) { + handleNonStopToolkitInstantiationException(e); + getTimeoutBehavior(false).notifyCacheEventListenersChanged(); + } catch (NonStopException e) { + nonstopObserver.end(NonStopOperationOutcomes.TIMEOUT); + getTimeoutBehavior(false).notifyCacheEventListenersChanged(); + } catch (RejoinException e) { + nonstopObserver.end(NonStopOperationOutcomes.REJOIN_TIMEOUT); + getTimeoutBehavior(true).notifyCacheEventListenersChanged(); + } finally { + nonStop.finish(); + } + } } Index: trunk/ehcache/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/store/ClusteredSafeStore.java =================================================================== diff -u -N -r8351 -r8447 --- trunk/ehcache/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/store/ClusteredSafeStore.java (.../ClusteredSafeStore.java) (revision 8351) +++ trunk/ehcache/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/store/ClusteredSafeStore.java (.../ClusteredSafeStore.java) (revision 8447) @@ -1167,4 +1167,22 @@ throw new CacheException("Uncaught exception in recalculateSize() - " + t.getMessage(), t); } } + + /** + * {@inheritDoc} + */ + @Override + public void notifyCacheEventListenersChanged() throws NonStopException, RejoinException { + // THIS IS GENERATED CODE -- DO NOT HAND MODIFY! + try { + this.delegateClusteredStore.notifyCacheEventListenersChanged(); + } catch (NonStopException e) { + throw e; + } catch (RejoinException e) { + throw e; + } catch (Throwable t) { + EXCEPTION_HANDLER.handleException(t); + throw new CacheException("Uncaught exception in notifyCacheEventListenersChanged() - " + t.getMessage(), t); + } + } } Index: trunk/ehcache/ehcache-core/src/test/java/net/sf/ehcache/constructs/nonstop/MockStoreWithFlag.java =================================================================== diff -u -N --- trunk/ehcache/ehcache-core/src/test/java/net/sf/ehcache/constructs/nonstop/MockStoreWithFlag.java (revision 8351) +++ trunk/ehcache/ehcache-core/src/test/java/net/sf/ehcache/constructs/nonstop/MockStoreWithFlag.java (revision 0) @@ -1,360 +0,0 @@ -/** - * Copyright Terracotta, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package net.sf.ehcache.constructs.nonstop; - -import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import net.sf.ehcache.CacheException; -import net.sf.ehcache.Element; -import net.sf.ehcache.Status; -import net.sf.ehcache.concurrent.CacheLockProvider; -import net.sf.ehcache.config.CacheConfiguration; -import net.sf.ehcache.search.Attribute; -import net.sf.ehcache.search.Results; -import net.sf.ehcache.search.attribute.AttributeExtractor; -import net.sf.ehcache.store.ElementValueComparator; -import net.sf.ehcache.store.Policy; -import net.sf.ehcache.store.StoreListener; -import net.sf.ehcache.store.StoreQuery; -import net.sf.ehcache.store.TerracottaStore; -import net.sf.ehcache.writer.CacheWriterManager; -import net.sf.ehcache.writer.writebehind.WriteBehind; - -public class MockStoreWithFlag implements TerracottaStore { - - private boolean accessFlag = false; - private String lastMethodInvoked; - private final CacheLockProvider mockCacheLockProvider = new NullCacheLockProvider(); - - public void markAccessFlag() { - accessFlag = true; - Exception exception = new Exception(); - exception.fillInStackTrace(); - StackTraceElement[] stackTrace = exception.getStackTrace(); - StackTraceElement lastStackTraceElement = stackTrace[1]; - lastMethodInvoked = lastStackTraceElement.getMethodName(); - } - - public String getLastMethodInvoked() { - return lastMethodInvoked; - } - - public boolean isAccessFlagMarked() { - return accessFlag; - } - - public void clearAccessFlag() { - this.accessFlag = false; - this.lastMethodInvoked = ""; - } - - public boolean bufferFull() { - markAccessFlag(); - return false; - } - - public boolean containsKey(Object key) { - markAccessFlag(); - return false; - } - - public boolean containsKeyInMemory(Object key) { - markAccessFlag(); - return false; - } - - public boolean containsKeyOnDisk(Object key) { - markAccessFlag(); - return false; - } - - public void dispose() { - markAccessFlag(); - - } - - public void expireElements() { - markAccessFlag(); - - } - - public void flush() throws IOException { - markAccessFlag(); - - } - - public Element get(Object key) { - markAccessFlag(); - return null; - } - - public Policy getInMemoryEvictionPolicy() { - markAccessFlag(); - return null; - } - - public int getInMemorySize() { - markAccessFlag(); - return 0; - } - - public long getInMemorySizeInBytes() { - markAccessFlag(); - return 0; - } - - public Object getInternalContext() { - markAccessFlag(); - return mockCacheLockProvider; - } - - public List getKeys() { - markAccessFlag(); - return Collections.EMPTY_LIST; - } - - public int getOnDiskSize() { - markAccessFlag(); - return 0; - } - - public long getOnDiskSizeInBytes() { - markAccessFlag(); - return 0; - } - - public boolean hasAbortedSizeOf() { - markAccessFlag(); - return false; - } - - public Element getQuiet(Object key) { - markAccessFlag(); - return null; - } - - public Map getAllQuiet(Collection keys) { - markAccessFlag(); - Map rv = new HashMap(); - for (Object key : keys) { - rv.put(key, null); - } - return rv; - } - - public Map getAll(Collection keys) { - markAccessFlag(); - return getAllQuiet(keys); - } - - public int getSize() { - markAccessFlag(); - return 0; - } - - public Status getStatus() { - markAccessFlag(); - return null; - } - - public int getTerracottaClusteredSize() { - markAccessFlag(); - return 0; - } - - public boolean isCacheCoherent() { - markAccessFlag(); - return false; - } - - public boolean isClusterCoherent() { - markAccessFlag(); - return false; - } - - public boolean isNodeCoherent() { - markAccessFlag(); - return false; - } - - public boolean put(Element element) throws CacheException { - markAccessFlag(); - return false; - } - - public void putAll(Collection elements) throws CacheException { - markAccessFlag(); - } - - public Element putIfAbsent(Element element) throws NullPointerException { - markAccessFlag(); - return null; - } - - public boolean putWithWriter(Element element, CacheWriterManager writerManager) throws CacheException { - markAccessFlag(); - return false; - } - - public Element remove(Object key) { - markAccessFlag(); - return null; - } - - public void removeAll(Collection keys) { - markAccessFlag(); - } - - public void removeAll() throws CacheException { - markAccessFlag(); - } - - public Element removeElement(Element element, ElementValueComparator comparator) throws NullPointerException { - markAccessFlag(); - return null; - } - - public Element removeWithWriter(Object key, CacheWriterManager writerManager) throws CacheException { - markAccessFlag(); - return null; - } - - public boolean replace(Element old, Element element, ElementValueComparator comparator) throws NullPointerException, - IllegalArgumentException { - markAccessFlag(); - return false; - } - - public Element replace(Element element) throws NullPointerException { - markAccessFlag(); - return null; - } - - public void setInMemoryEvictionPolicy(Policy policy) { - markAccessFlag(); - - } - - public void setNodeCoherent(boolean coherent) throws UnsupportedOperationException { - markAccessFlag(); - - } - - public void waitUntilClusterCoherent() throws UnsupportedOperationException { - markAccessFlag(); - } - - public void addStoreListener(StoreListener listener) { - markAccessFlag(); - } - - public void removeStoreListener(StoreListener listener) { - markAccessFlag(); - } - - public int getOffHeapSize() { - markAccessFlag(); - return 0; - } - - public long getOffHeapSizeInBytes() { - markAccessFlag(); - return 0; - } - - public boolean containsKeyOffHeap(Object key) { - markAccessFlag(); - return false; - } - - public Object getMBean() { - return null; - } - - public void setAttributeExtractors(Map extractors) { - // no-op - - } - - public Results executeQuery(StoreQuery query) { - throw new UnsupportedOperationException(); - } - - @Override - public Set getSearchAttributes() { - return Collections.emptySet(); - } - - public Attribute getSearchAttribute(String attributeName) { - return new Attribute(attributeName); - } - - public Set getLocalKeys() { - markAccessFlag(); - return null; - } - - public CacheConfiguration.TransactionalMode getTransactionalMode() { - markAccessFlag(); - return null; - } - - public Element unlockedGet(Object key) { - markAccessFlag(); - return null; - } - - public Element unlockedGetQuiet(Object key) { - markAccessFlag(); - return null; - } - - public Element unsafeGet(Object key) { - markAccessFlag(); - return null; - } - - public void quickClear() { - markAccessFlag(); - } - - public int quickSize() { - markAccessFlag(); - return 0; - } - - public Element unsafeGetQuiet(Object key) { - markAccessFlag(); - return null; - } - - public void recalculateSize(Object key) { - markAccessFlag(); - } - - @Override - public WriteBehind createWriteBehind() { - throw new UnsupportedOperationException(); - } - -} Index: trunk/ehcache/ehcache-core/src/main/java/net/sf/ehcache/event/RegisteredEventListeners.java =================================================================== diff -u -N -r7444 -r8447 --- trunk/ehcache/ehcache-core/src/main/java/net/sf/ehcache/event/RegisteredEventListeners.java (.../RegisteredEventListeners.java) (revision 7444) +++ trunk/ehcache/ehcache-core/src/main/java/net/sf/ehcache/event/RegisteredEventListeners.java (.../RegisteredEventListeners.java) (revision 8447) @@ -23,8 +23,11 @@ import net.sf.ehcache.CacheOperationOutcomes; import net.sf.ehcache.CacheOperationOutcomes.ExpiredOutcome; import net.sf.ehcache.CacheStoreHelper; +import net.sf.ehcache.Ehcache; import net.sf.ehcache.Element; +import net.sf.ehcache.Status; import net.sf.ehcache.distribution.CacheReplicator; +import net.sf.ehcache.store.TerracottaStore; import org.terracotta.statistics.StatisticsManager; import org.terracotta.statistics.observer.OperationObserver; @@ -42,7 +45,7 @@ * * @author Greg Luck * @author Geert Bevin - * @version $Id: RegisteredEventListeners.java 7444 2013-04-26 21:16:29Z ljacomet $ + * @version $Id: RegisteredEventListeners.java 8447 2013-11-14 00:08:52Z twu $ */ public class RegisteredEventListeners { @@ -54,7 +57,7 @@ */ private final Set cacheEventListeners = new CopyOnWriteArraySet(); private final Set orderedListeners = new CopyOnWriteArraySet(); - private final Cache cache; + private final Ehcache cache; private final AtomicBoolean hasReplicator = new AtomicBoolean(false); @@ -69,13 +72,23 @@ * @param cache */ public RegisteredEventListeners(Cache cache) { - //XXX this isn't really very nice - StatisticsManager.associate(this).withParent(cache); - this.cache = cache; - helper = new CacheStoreHelper(cache); + this(cache, new CacheStoreHelper(cache)); } /** + * Construct a registered event listeners service + * + * @param cache the {@link Cache} + * @param helper helper for getting the {@link net.sf.ehcache.store.Store} out of the {@link Cache} + */ + public RegisteredEventListeners(Ehcache cache, CacheStoreHelper helper) { + //XXX this isn't really very nice + StatisticsManager.associate(this).withParent(cache); + this.cache = cache; + this.helper = helper; + } + + /** * Notifies {@link InternalCacheEventListener}s, when an update happens * @param oldElement the old element * @param newElement the new element @@ -377,6 +390,9 @@ if (result && cacheEventListener instanceof CacheReplicator) { this.hasReplicator.set(true); } + if (result) { + notifyEventListenersChangedIfNecessary(); + } return result; } @@ -423,7 +439,9 @@ } else { hasReplicator.set(false); } - + if (result) { + notifyEventListenersChangedIfNecessary(); + } return result; } @@ -470,6 +488,8 @@ } cacheEventListeners.clear(); + notifyEventListenersChangedIfNecessary(); + for (InternalCacheEventListener orderedListener : orderedListeners) { orderedListener.dispose(); } @@ -592,4 +612,9 @@ } + private void notifyEventListenersChangedIfNecessary() { + if (cache.getStatus() == Status.STATUS_ALIVE && helper.getStore() instanceof TerracottaStore) { + ((TerracottaStore)helper.getStore()).notifyCacheEventListenersChanged(); + } + } } Index: trunk/ehcache/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/store/nonstop/RejoinWithoutNonStopStore.java =================================================================== diff -u -N -r8351 -r8447 --- trunk/ehcache/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/store/nonstop/RejoinWithoutNonStopStore.java (.../RejoinWithoutNonStopStore.java) (revision 8351) +++ trunk/ehcache/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/store/nonstop/RejoinWithoutNonStopStore.java (.../RejoinWithoutNonStopStore.java) (revision 8447) @@ -606,4 +606,9 @@ public WriteBehind createWriteBehind() { throw new UnsupportedOperationException(); } + + @Override + public void notifyCacheEventListenersChanged() { + throw new RejoinCacheException("Client started rejoin during notifyCacheEventListenersChanged()"); + } } Index: trunk/ehcache/ehcache-core/src/test/java/net/sf/ehcache/event/CacheEventListenerTest.java =================================================================== diff -u -N -r8370 -r8447 --- trunk/ehcache/ehcache-core/src/test/java/net/sf/ehcache/event/CacheEventListenerTest.java (.../CacheEventListenerTest.java) (revision 8370) +++ trunk/ehcache/ehcache-core/src/test/java/net/sf/ehcache/event/CacheEventListenerTest.java (.../CacheEventListenerTest.java) (revision 8447) @@ -24,6 +24,7 @@ import net.sf.ehcache.Ehcache; import net.sf.ehcache.Element; import net.sf.ehcache.event.CountingCacheEventListener.CacheEvent; +import net.sf.ehcache.store.TerracottaStore; import net.sf.ehcache.store.disk.DiskStore; import net.sf.ehcache.store.disk.DiskStoreHelper; import net.sf.ehcache.util.RetryAssert; @@ -58,13 +59,16 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Tests the cache listener functionality * * @author Greg Luck - * @version $Id: CacheEventListenerTest.java 8370 2013-10-23 15:34:14Z ljacomet $ + * @version $Id: CacheEventListenerTest.java 8447 2013-11-14 00:08:52Z twu $ */ @Category(CheckShorts.class) public class CacheEventListenerTest extends AbstractCacheTest { @@ -799,4 +803,24 @@ runThreads(executables); } + + @Test + public void testNotifyChangeOnListenerRemove() throws Exception { + TerracottaStore terracottaStore = mock(TerracottaStore.class); + CacheStoreHelper storeHelper = when(mock(CacheStoreHelper.class).getStore()).thenReturn(terracottaStore).getMock(); + RegisteredEventListeners registeredEventListeners = new RegisteredEventListeners(manager.getCache(cacheName), storeHelper); + registeredEventListeners.registerListener(new TestCacheEventListener()); + verify(terracottaStore).notifyCacheEventListenersChanged(); + registeredEventListeners.unregisterListener(new TestCacheEventListener()); + verify(terracottaStore).notifyCacheEventListenersChanged(); + } + + @Test + public void testNotifyChangeOnDispose() throws Exception { + TerracottaStore terracottaStore = mock(TerracottaStore.class); + CacheStoreHelper storeHelper = when(mock(CacheStoreHelper.class).getStore()).thenReturn(terracottaStore).getMock(); + RegisteredEventListeners registeredEventListeners = new RegisteredEventListeners(manager.getCache(cacheName), storeHelper); + registeredEventListeners.dispose(); + verify(terracottaStore).notifyCacheEventListenersChanged(); + } } Index: trunk/ehcache/ehcache-core/src/test/java/net/sf/ehcache/constructs/nonstop/BlockingCallable.java =================================================================== diff -u -N --- trunk/ehcache/ehcache-core/src/test/java/net/sf/ehcache/constructs/nonstop/BlockingCallable.java (revision 5594) +++ trunk/ehcache/ehcache-core/src/test/java/net/sf/ehcache/constructs/nonstop/BlockingCallable.java (revision 0) @@ -1,59 +0,0 @@ -/** - * Copyright Terracotta, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package net.sf.ehcache.constructs.nonstop; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.Callable; - -public class BlockingCallable implements Callable { - - private static final Logger LOG = LoggerFactory.getLogger(BlockingCallable.class); - - private final boolean logExecution; - private final Object monitor = new Object(); - private volatile boolean blocked = true; - - public BlockingCallable() { - this(false); - } - - public BlockingCallable(boolean logExecution) { - this.logExecution = logExecution; - } - - public Void call() throws Exception { - if (logExecution) { - LOG.info("inside blocking callable"); - } - while (blocked) { - synchronized (monitor) { - monitor.wait(); - } - } - return null; - } - - public void unblock() { - synchronized (monitor) { - blocked = false; - monitor.notifyAll(); - } - } - -} Index: trunk/ehcache/ehcache-core/src/main/java/net/sf/ehcache/store/TerracottaTransactionalCopyingCacheStore.java =================================================================== diff -u -N -r8351 -r8447 --- trunk/ehcache/ehcache-core/src/main/java/net/sf/ehcache/store/TerracottaTransactionalCopyingCacheStore.java (.../TerracottaTransactionalCopyingCacheStore.java) (revision 8351) +++ trunk/ehcache/ehcache-core/src/main/java/net/sf/ehcache/store/TerracottaTransactionalCopyingCacheStore.java (.../TerracottaTransactionalCopyingCacheStore.java) (revision 8447) @@ -69,4 +69,9 @@ public WriteBehind createWriteBehind() { return getUnderlyingStore().createWriteBehind(); } + + @Override + public void notifyCacheEventListenersChanged() { + getUnderlyingStore().notifyCacheEventListenersChanged(); + } } Index: trunk/ehcache/ehcache-core/src/test/java/net/sf/ehcache/constructs/nonstop/BlockingMockStore.java =================================================================== diff -u -N --- trunk/ehcache/ehcache-core/src/test/java/net/sf/ehcache/constructs/nonstop/BlockingMockStore.java (revision 8351) +++ trunk/ehcache/ehcache-core/src/test/java/net/sf/ehcache/constructs/nonstop/BlockingMockStore.java (revision 0) @@ -1,369 +0,0 @@ -/** - * Copyright Terracotta, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package net.sf.ehcache.constructs.nonstop; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import net.sf.ehcache.CacheException; -import net.sf.ehcache.Element; -import net.sf.ehcache.Status; -import net.sf.ehcache.concurrent.CacheLockProvider; -import net.sf.ehcache.config.CacheConfiguration; -import net.sf.ehcache.search.Attribute; -import net.sf.ehcache.search.Results; -import net.sf.ehcache.search.attribute.AttributeExtractor; -import net.sf.ehcache.store.ElementValueComparator; -import net.sf.ehcache.store.Policy; -import net.sf.ehcache.store.StoreListener; -import net.sf.ehcache.store.StoreQuery; -import net.sf.ehcache.store.TerracottaStore; -import net.sf.ehcache.writer.CacheWriterManager; -import net.sf.ehcache.writer.writebehind.WriteBehind; - -/** - * All operations in this Store never returns - * - * @author Abhishek Sanoujam - */ -public class BlockingMockStore implements TerracottaStore { - - private static final List skipMethods; - private final CacheLockProvider cacheLockProvider = new NullCacheLockProvider(); - - static { - // list of methods (in Store) which are: - // - indirectly used from other methods in Ehcache before reaching the Store layer - // - nonstopStore does not delegate these methods to the executorService - List skip = new ArrayList(); - skip.add("bufferFull"); - skip.add("containsKeyOnDisk"); - skip.add("containsKeyOffHeap"); - skip.add("getOffHeapSize"); - skipMethods = skip; - } - - private volatile boolean blocking = true; - - public void setBlocking(boolean enabled) { - this.blocking = enabled; - } - - private String getPreviousMethodName() { - StackTraceElement[] stackTrace = new Exception().fillInStackTrace().getStackTrace(); - StackTraceElement element = stackTrace[2]; - return element.getMethodName(); - } - - private void neverReturn() { - if (blocking && !skipMethods.contains(getPreviousMethodName())) { - try { - Thread.currentThread().join(); - } catch (Exception e) { - throw new CacheException(e); - } - } - } - - - public boolean bufferFull() { - neverReturn(); - return false; - } - - public boolean containsKey(Object key) { - neverReturn(); - return false; - } - - public boolean containsKeyInMemory(Object key) { - neverReturn(); - return false; - } - - public boolean containsKeyOnDisk(Object key) { - neverReturn(); - return false; - } - - public void dispose() { - neverReturn(); - } - - public void expireElements() { - neverReturn(); - } - - public void flush() throws IOException { - neverReturn(); - } - - public Element get(Object key) { - neverReturn(); - return null; - } - - public Policy getInMemoryEvictionPolicy() { - neverReturn(); - return null; - } - - public int getInMemorySize() { - neverReturn(); - return 0; - } - - public long getInMemorySizeInBytes() { - neverReturn(); - return 0; - } - - public Object getInternalContext() { - neverReturn(); - return cacheLockProvider; - } - - public List getKeys() { - neverReturn(); - return null; - } - - public int getOnDiskSize() { - neverReturn(); - return 0; - } - - public long getOnDiskSizeInBytes() { - neverReturn(); - return 0; - } - - public boolean hasAbortedSizeOf() { - neverReturn(); - return false; - } - - public Element getQuiet(Object key) { - neverReturn(); - return null; - } - - public Map getAll(Collection keys) { - neverReturn(); - return null; - } - - public Map getAllQuiet(Collection keys) { - neverReturn(); - return null; - } - - public int getSize() { - neverReturn(); - return 0; - } - - public Status getStatus() { - neverReturn(); - return null; - } - - public int getTerracottaClusteredSize() { - neverReturn(); - return 0; - } - - public boolean isCacheCoherent() { - neverReturn(); - return false; - } - - public boolean isClusterCoherent() { - neverReturn(); - return false; - } - - public boolean isNodeCoherent() { - neverReturn(); - return false; - } - - public boolean put(Element element) throws CacheException { - neverReturn(); - return false; - } - - public void putAll(Collection elements) throws CacheException { - neverReturn(); - } - - public Element putIfAbsent(Element element) throws NullPointerException { - neverReturn(); - return null; - } - - public boolean putWithWriter(Element element, CacheWriterManager writerManager) throws CacheException { - neverReturn(); - return false; - } - - public Element remove(Object key) { - neverReturn(); - return null; - } - - public void removeAll(Collection keys) { - neverReturn(); - } - - public void removeAll() throws CacheException { - neverReturn(); - } - - public Element removeElement(Element element, ElementValueComparator comparator) throws NullPointerException { - neverReturn(); - return null; - } - - public Element removeWithWriter(Object key, CacheWriterManager writerManager) throws CacheException { - neverReturn(); - return null; - } - - public boolean replace(Element old, Element element, ElementValueComparator comparator) throws NullPointerException, - IllegalArgumentException { - neverReturn(); - return false; - } - - public Element replace(Element element) throws NullPointerException { - neverReturn(); - return null; - } - - public void setInMemoryEvictionPolicy(Policy policy) { - neverReturn(); - } - - public void setNodeCoherent(boolean coherent) throws UnsupportedOperationException { - neverReturn(); - } - - public void waitUntilClusterCoherent() throws UnsupportedOperationException { - neverReturn(); - } - - public void addStoreListener(StoreListener listener) { - neverReturn(); - } - - public void removeStoreListener(StoreListener listener) { - neverReturn(); - } - - public int getOffHeapSize() { - neverReturn(); - return 0; - } - - public long getOffHeapSizeInBytes() { - neverReturn(); - return 0; - } - - public boolean containsKeyOffHeap(Object key) { - neverReturn(); - return false; - } - - public Object getMBean() { - return null; - } - - public void setAttributeExtractors(Map extractors) { - // no-op - } - - public Results executeQuery(StoreQuery query) { - throw new UnsupportedOperationException(); - } - - public Set getSearchAttributes() { - throw new UnsupportedOperationException(); - } - - public Attribute getSearchAttribute(String attributeName) { - throw new UnsupportedOperationException(); - } - - public Set getLocalKeys() { - // should never block - return Collections.EMPTY_SET; - } - - public CacheConfiguration.TransactionalMode getTransactionalMode() { - neverReturn(); - return null; - } - - public Element unlockedGet(Object key) { - neverReturn(); - return null; - } - - public Element unlockedGetQuiet(Object key) { - neverReturn(); - return null; - } - - public Element unsafeGet(Object key) { - // unsafe gets never block - return null; - } - - @Override - public void quickClear() { - neverReturn(); - } - - @Override - public int quickSize() { - neverReturn(); - return 0; - } - - public Element unsafeGetQuiet(Object key) { - // unsafe gets never block - return null; - } - - public void recalculateSize(Object key) { - neverReturn(); - } - - @Override - public WriteBehind createWriteBehind() { - // TODO Auto-generated method stub - return null; - } - -} Index: trunk/ehcache/ehcache-core/src/main/java/net/sf/ehcache/transaction/AbstractTransactionStore.java =================================================================== diff -u -N -r8351 -r8447 --- trunk/ehcache/ehcache-core/src/main/java/net/sf/ehcache/transaction/AbstractTransactionStore.java (.../AbstractTransactionStore.java) (revision 8351) +++ trunk/ehcache/ehcache-core/src/main/java/net/sf/ehcache/transaction/AbstractTransactionStore.java (.../AbstractTransactionStore.java) (revision 8447) @@ -356,4 +356,11 @@ return oldElement; } } + + @Override + public void notifyCacheEventListenersChanged() { + if (underlyingStore instanceof TerracottaStore) { + ((TerracottaStore)underlyingStore).notifyCacheEventListenersChanged(); + } + } } Index: trunk/ehcache/terracotta/bootstrap/src/test/java/org/terracotta/modules/ehcache/store/ClusteredStoreEventualCasEnabledTest.java =================================================================== diff -u -N -r8446 -r8447 --- trunk/ehcache/terracotta/bootstrap/src/test/java/org/terracotta/modules/ehcache/store/ClusteredStoreEventualCasEnabledTest.java (.../ClusteredStoreEventualCasEnabledTest.java) (revision 8446) +++ trunk/ehcache/terracotta/bootstrap/src/test/java/org/terracotta/modules/ehcache/store/ClusteredStoreEventualCasEnabledTest.java (.../ClusteredStoreEventualCasEnabledTest.java) (revision 8447) @@ -53,26 +53,11 @@ * * @author Ludovic Orban */ -public class ClusteredStoreEventualCasEnabledTest { +public class ClusteredStoreEventualCasEnabledTest extends AbstractClusteredStoreTest { private static boolean previousValue; private static Field eventual_cas_enabled; - private final ToolkitInstanceFactory toolkitInstanceFactory = mock(ToolkitInstanceFactory.class); - private final Ehcache cache = mock(Ehcache.class); - private final CacheCluster cacheCluster = mock(CacheCluster.class); - private final CacheConfiguration cacheConfiguration = new CacheConfiguration().terracotta(new TerracottaConfiguration().clustered(true) - .consistency(TerracottaConfiguration.Consistency.EVENTUAL)); - private final CacheManager cacheManager = mock(CacheManager.class); - private final ToolkitMap configMap = mock(ToolkitMap.class); - private final ToolkitInternal toolkitInternal = mock(ToolkitInternal.class); - private final ToolkitProperties toolkitProperties = mock(ToolkitProperties.class); - private final ToolkitCacheInternal toolkitCacheInternal = mock(ToolkitCacheInternal.class); - private final org.terracotta.toolkit.config.Configuration toolkitCacheConfiguration = mock(org.terracotta.toolkit.config.Configuration.class); - private final ToolkitNotifier toolkitNotifier = mock(ToolkitNotifier.class); - private ClusteredStore clusteredStore; - private final ToolkitReadWriteLock tkRWLock = mock(ToolkitReadWriteLock.class); - @BeforeClass public static void setUpClass() { try { @@ -99,35 +84,6 @@ } } - @Before - public void setUpClusteredStore() { - when(cache.getCacheConfiguration()).thenReturn(cacheConfiguration); - when(cache.getCacheManager()).thenReturn(cacheManager); - when(cache.getName()).thenReturn("ClusteredStoreTest-cache"); - when(cacheManager.getName()).thenReturn("ClusteredStoreTest-cm"); - when(toolkitInstanceFactory.getOrCreateClusteredStoreConfigMap(eq("ClusteredStoreTest-cm"), eq("ClusteredStoreTest-cache"))).thenReturn(configMap); - when(toolkitInstanceFactory.getToolkit()).thenReturn(toolkitInternal); - when(toolkitInternal.getProperties()).thenReturn(toolkitProperties); - when(toolkitProperties.getBoolean(anyString())).thenReturn(false); - when(toolkitInstanceFactory.getOrCreateToolkitCache(eq(cache), eq(false))).thenReturn(toolkitCacheInternal); - when(toolkitCacheInternal.getConfiguration()).thenReturn(toolkitCacheConfiguration); - when(toolkitCacheConfiguration.getInt(anyString())).thenReturn(1); - when(toolkitInstanceFactory.getOrCreateConfigChangeNotifier(eq(cache))).thenReturn(toolkitNotifier); - when(toolkitCacheInternal.createLockForKey(any())).thenReturn(tkRWLock); - when(tkRWLock.writeLock()).thenReturn(mock(ToolkitLock.class)); - clusteredStore = new ClusteredStore(toolkitInstanceFactory, cache, cacheCluster) { - @Override - void setUpWanConfig() { - // Do Nothing - } - - @Override - boolean isWANEnabled() { - return false; - } - }; - } - @Test public void clusteredStore_putIfAbsent_enabled_in_eventual_consistency() { clusteredStore.putIfAbsent(new Element("key", "value")); Index: trunk/ehcache/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/store/nonstop/LocalReadsOnTimeoutStore.java =================================================================== diff -u -N -r8351 -r8447 --- trunk/ehcache/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/store/nonstop/LocalReadsOnTimeoutStore.java (.../LocalReadsOnTimeoutStore.java) (revision 8351) +++ trunk/ehcache/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/store/nonstop/LocalReadsOnTimeoutStore.java (.../LocalReadsOnTimeoutStore.java) (revision 8447) @@ -34,7 +34,7 @@ import java.util.Set; /** - * A {@link NonstopStore} implementation that returns the local value in the VM, if present, for get operations and + * A {@link TerracottaStore} implementation that returns the local value in the VM, if present, for get operations and * no-op for put, remove and other operations * * @author Abhishek Sanoujam @@ -44,7 +44,7 @@ private final TerracottaStore delegate; /** - * Constructor accepting the {@link NonstopActiveDelegateHolder} + * Constructor accepting the {@link TerracottaStore} */ public LocalReadsOnTimeoutStore(TerracottaStore delegate) { this.delegate = delegate; @@ -629,4 +629,9 @@ public WriteBehind createWriteBehind() { throw new UnsupportedOperationException(); } + + @Override + public void notifyCacheEventListenersChanged() { + delegate.notifyCacheEventListenersChanged(); + } } Index: trunk/ehcache/ehcache-core/src/test/java/net/sf/ehcache/terracotta/TestRejoinStore.java =================================================================== diff -u -N -r8351 -r8447 --- trunk/ehcache/ehcache-core/src/test/java/net/sf/ehcache/terracotta/TestRejoinStore.java (.../TestRejoinStore.java) (revision 8351) +++ trunk/ehcache/ehcache-core/src/test/java/net/sf/ehcache/terracotta/TestRejoinStore.java (.../TestRejoinStore.java) (revision 8447) @@ -396,4 +396,8 @@ throw new UnsupportedOperationException(); } + @Override + public void notifyCacheEventListenersChanged() { + alwaysCalledMethod(); + } } Index: trunk/ehcache/ehcache-core/src/main/java/net/sf/ehcache/store/TerracottaStore.java =================================================================== diff -u -N -r8351 -r8447 --- trunk/ehcache/ehcache-core/src/main/java/net/sf/ehcache/store/TerracottaStore.java (.../TerracottaStore.java) (revision 8351) +++ trunk/ehcache/ehcache-core/src/main/java/net/sf/ehcache/store/TerracottaStore.java (.../TerracottaStore.java) (revision 8447) @@ -77,4 +77,9 @@ */ public WriteBehind createWriteBehind(); + /** + * Notify the underlying store that some change has occured in the set of registered cache listeners. + */ + public void notifyCacheEventListenersChanged(); + } Index: trunk/ehcache/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/store/nonstop/ExceptionOnTimeoutStore.java =================================================================== diff -u -N -r8351 -r8447 --- trunk/ehcache/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/store/nonstop/ExceptionOnTimeoutStore.java (.../ExceptionOnTimeoutStore.java) (revision 8351) +++ trunk/ehcache/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/store/nonstop/ExceptionOnTimeoutStore.java (.../ExceptionOnTimeoutStore.java) (revision 8447) @@ -31,7 +31,7 @@ import java.util.Set; /** - * Implementation of {@link NonstopStore} that throws {@link NonStopCacheException} for all operations. + * Implementation of {@link TerracottaStore} that throws {@link NonStopCacheException} for all operations. * * @author Abhishek Sanoujam */ @@ -613,4 +613,9 @@ public WriteBehind createWriteBehind() { throw new UnsupportedOperationException(); } + + @Override + public void notifyCacheEventListenersChanged() { + throw new NonStopCacheException("notifyCacheEventListenersChanged() timed out"); + } } Index: trunk/ehcache/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/store/ClusteredStore.java =================================================================== diff -u -N -r8446 -r8447 --- trunk/ehcache/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/store/ClusteredStore.java (.../ClusteredStore.java) (revision 8446) +++ trunk/ehcache/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/store/ClusteredStore.java (.../ClusteredStore.java) (revision 8447) @@ -13,6 +13,7 @@ import net.sf.ehcache.Status; import net.sf.ehcache.cluster.CacheCluster; import net.sf.ehcache.cluster.ClusterNode; +import net.sf.ehcache.cluster.ClusterTopologyListener; import net.sf.ehcache.concurrent.CacheLockProvider; import net.sf.ehcache.config.CacheConfiguration; import net.sf.ehcache.config.CacheConfiguration.TransactionalMode; @@ -77,7 +78,7 @@ private static final String CHECK_CONTAINS_KEY_ON_PUT_PROPERTY_NAME = "ehcache.clusteredStore.checkContainsKeyOnPut"; private static final String TRANSACTIONAL_MODE = "trasactionalMode"; private static final String LEADER_ELECTION_LOCK_NAME = "SERVER-EVENT-SUBSCRIPTION-LOCK"; - private static final String LEADER_NODE_ID = "LEADER-NODE-ID"; + protected static final String LEADER_NODE_ID = "LEADER-NODE-ID"; private static final boolean EVENTUAL_CAS_ENABLED = Boolean.getBoolean(ENABLE_EVENTUAL_CAS_OPERATIONS_PROPERTY); // final protected fields @@ -99,6 +100,7 @@ // non-final private fields private EventListenerList listenerList; + private boolean cacheEventListenerRegistered = false; private final ToolkitLock eventualConcurrentLock; private final ToolkitLock leaderElectionLock; private final boolean isEventual; @@ -110,6 +112,7 @@ .build(); private final CacheCluster topology; private final ToolkitMap configMap; + private final EventListenersRefresher eventListenersRefresher; public ClusteredStore(ToolkitInstanceFactory toolkitInstanceFactory, Ehcache cache, CacheCluster topology) { validateConfig(cache); @@ -159,7 +162,9 @@ // per-cache lock to ensure only one client can register a listener leaderElectionLock = toolkitInstanceFactory.getLockForCache(cache, LEADER_ELECTION_LOCK_NAME); evictionListener = new CacheEventListener(); - backend.addListener(evictionListener); + eventListenersRefresher = new EventListenersRefresher(); + topology.addTopologyListener(eventListenersRefresher); + notifyCacheEventListenersChanged(); // just notify to initialize the registration state CacheLockProvider cacheLockProvider = new TCCacheLockProvider(backend, valueModeHandler); internalContext = new ClusteredCacheInternalContext(toolkitInstanceFactory.getToolkit(), cacheLockProvider); @@ -511,19 +516,12 @@ @Override public void dispose() { + dropLeaderStatus(); + topology.removeTopologyListener(eventListenersRefresher); backend.removeListener(evictionListener); backend.disposeLocally(); cacheConfigChangeBridge.disconnectConfigs(); toolkitInstanceFactory.removeNonStopConfigforCache(cache); - - leaderElectionLock.lock(); - try { - if (isThisNodeLeader()) { - configMap.remove(LEADER_NODE_ID); - } - } finally { - leaderElectionLock.unlock(); - } } @Override @@ -850,10 +848,21 @@ return (String) configMap.get(LEADER_NODE_ID); } - public boolean isThisNodeLeader() { + private boolean isThisNodeLeader() { return topology.getCurrentNode().getId().equals(getLeader()); } + private void dropLeaderStatus() { + leaderElectionLock.lock(); + try { + if (isThisNodeLeader()) { + configMap.remove(LEADER_NODE_ID); + } + } finally { + leaderElectionLock.unlock(); + } + } + private void electLeaderIfNecessary() { String leader; while ((leader = getLeader()) == null || isNotInCluster(leader)) { @@ -878,32 +887,55 @@ return true; } - private class CacheEventListener implements ToolkitCacheListener { + private class CacheEventListener implements ToolkitCacheListener { @Override - public void onEviction(Object key) { + public void onEviction(String key) { evictionObserver.begin(); evictionObserver.end(EvictionOutcome.SUCCESS); - electLeaderIfNecessary(); // only leader handles server events if (isThisNodeLeader()) { - Element element = new Element(valueModeHandler.getRealKeyObject((String) key), null); + Element element = new Element(valueModeHandler.getRealKeyObject(key), null); registeredEventListeners.notifyElementEvicted(element, false); } } @Override - public void onExpiration(Object key) { + public void onExpiration(String key) { electLeaderIfNecessary(); // only leader handles server events if (isThisNodeLeader()) { - Element element = new Element(valueModeHandler.getRealKeyObject((String) key), null); + Element element = new Element(valueModeHandler.getRealKeyObject(key), null); registeredEventListeners.notifyElementExpiry(element, false); } } } + private class EventListenersRefresher implements ClusterTopologyListener { + @Override + public void nodeJoined(final ClusterNode node) { + } + + @Override + public void nodeLeft(final ClusterNode node) { + } + + @Override + public void clusterOnline(final ClusterNode node) { + // Need to refresh once when cluster is online again in case listener unregister failure due to cluster disconnect + notifyCacheEventListenersChanged(); + } + + @Override + public void clusterOffline(final ClusterNode node) { + } + + @Override + public void clusterRejoined(final ClusterNode oldNode, final ClusterNode newNode) { + } + } + // tests assert on the log msg printed private static String getConcurrencyValueLogMsg(String name, int concurrency) { return "Cache [" + name + "] using concurrency: " + concurrency; @@ -926,6 +958,18 @@ return (int) System.currentTimeMillis() / 1000; } + @Override + public synchronized void notifyCacheEventListenersChanged() { + if (cache.getCacheEventNotificationService().hasCacheEventListeners() && !cacheEventListenerRegistered) { + backend.addListener(evictionListener); + cacheEventListenerRegistered = true; + } else if (!cache.getCacheEventNotificationService().hasCacheEventListeners() && cacheEventListenerRegistered) { + dropLeaderStatus(); + backend.removeListener(evictionListener); + cacheEventListenerRegistered = false; + } + } + private static class ElementValueComparatorToolkitWrapper implements ToolkitValueComparator { private final Object key; Index: trunk/ehcache/ehcache-core/src/test/java/net/sf/ehcache/constructs/nonstop/ThreadDump.java =================================================================== diff -u -N --- trunk/ehcache/ehcache-core/src/test/java/net/sf/ehcache/constructs/nonstop/ThreadDump.java (revision 5594) +++ trunk/ehcache/ehcache-core/src/test/java/net/sf/ehcache/constructs/nonstop/ThreadDump.java (revision 0) @@ -1,91 +0,0 @@ -/** - * Copyright Terracotta, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package net.sf.ehcache.constructs.nonstop; - -import java.lang.management.ManagementFactory; -import java.lang.management.ThreadInfo; -import java.lang.management.ThreadMXBean; -import java.util.ArrayList; -import java.util.List; - -public abstract class ThreadDump { - private static final String NEWLINE = System.getProperty("line.separator", "\n"); - - public static List getThreadDump() { - List rv = new ArrayList(); - ThreadMXBean tbean = ManagementFactory.getThreadMXBean(); - for (long id : tbean.getAllThreadIds()) { - ThreadInfo tinfo = tbean.getThreadInfo(id); - if (tinfo != null) { - rv.add(new ThreadInformation(tinfo.getThreadId(), tinfo.getThreadName())); - } - } - return rv; - } - - public static String takeThreadDump() { - StringBuilder rv = new StringBuilder(); - ThreadMXBean tbean = ManagementFactory.getThreadMXBean(); - for (long id : tbean.getAllThreadIds()) { - ThreadInfo tinfo = tbean.getThreadInfo(id, Integer.MAX_VALUE); - if (tinfo != null) { - rv.append(tinfo).append(NEWLINE); - for (StackTraceElement e : tinfo.getStackTrace()) { - rv.append(" at ").append(e).append(NEWLINE); - } - rv.append(NEWLINE); - } - } - return rv.toString(); - } - - public static class ThreadInformation { - private final long threadId; - private final String threadName; - - public ThreadInformation(long threadId, String name) { - super(); - this.threadId = threadId; - this.threadName = name; - } - - public long getThreadId() { - return threadId; - } - - public String getThreadName() { - return threadName; - } - - @Override - public int hashCode() { - return ((int) threadId) ^ ((int) (threadId >>> 32)); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } else if (obj instanceof ThreadInformation) { - return threadId == ((ThreadInformation) obj).threadId; - } else { - return false; - } - } - - } -} Index: trunk/ehcache/terracotta/bootstrap/src/test/java/org/terracotta/modules/ehcache/store/AbstractClusteredStoreTest.java =================================================================== diff -u -N --- trunk/ehcache/terracotta/bootstrap/src/test/java/org/terracotta/modules/ehcache/store/AbstractClusteredStoreTest.java (revision 0) +++ trunk/ehcache/terracotta/bootstrap/src/test/java/org/terracotta/modules/ehcache/store/AbstractClusteredStoreTest.java (revision 8447) @@ -0,0 +1,81 @@ +package org.terracotta.modules.ehcache.store; + +import net.sf.ehcache.CacheManager; +import net.sf.ehcache.CacheStoreHelper; +import net.sf.ehcache.Ehcache; +import net.sf.ehcache.Status; +import net.sf.ehcache.cluster.CacheCluster; +import net.sf.ehcache.cluster.ClusterNode; +import net.sf.ehcache.config.CacheConfiguration; +import net.sf.ehcache.config.Configuration; +import net.sf.ehcache.config.TerracottaClientConfiguration; +import net.sf.ehcache.config.TerracottaConfiguration; +import net.sf.ehcache.event.RegisteredEventListeners; +import org.junit.Before; +import org.terracotta.modules.ehcache.ToolkitInstanceFactory; +import org.terracotta.toolkit.collections.ToolkitMap; +import org.terracotta.toolkit.concurrent.locks.ToolkitLock; +import org.terracotta.toolkit.events.ToolkitNotifier; +import org.terracotta.toolkit.internal.ToolkitInternal; +import org.terracotta.toolkit.internal.ToolkitProperties; +import org.terracotta.toolkit.internal.cache.ToolkitCacheInternal; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * @author tim + */ +public abstract class AbstractClusteredStoreTest { + protected Ehcache cache = mock(Ehcache.class); + protected CacheConfiguration cacheConfiguration = new CacheConfiguration().terracotta(new TerracottaConfiguration().clustered(true).consistency(TerracottaConfiguration.Consistency.EVENTUAL)); + protected Configuration configuration = new Configuration().name("ClusteredStoreTest-cm").terracotta(new TerracottaClientConfiguration()); + protected ToolkitCacheInternal toolkitCacheInternal = mock(ToolkitCacheInternal.class); + protected ClusteredStore clusteredStore; + private ToolkitInstanceFactory toolkitInstanceFactory = mock(ToolkitInstanceFactory.class); + protected CacheCluster cacheCluster = mockCacheCluster("abc"); + private CacheManager cacheManager = when(mock(CacheManager.class).getConfiguration()).thenReturn(configuration).getMock(); + protected ToolkitMap configMap = mock(ToolkitMap.class); + private ToolkitInternal toolkitInternal = mock(ToolkitInternal.class); + private ToolkitProperties toolkitProperties = mock(ToolkitProperties.class); + private org.terracotta.toolkit.config.Configuration toolkitCacheConfiguration = mock(org.terracotta.toolkit.config.Configuration.class); + private ToolkitNotifier toolkitNotifier = mock(ToolkitNotifier.class); + private CacheStoreHelper cacheStoreHelper = mock(CacheStoreHelper.class); + private ToolkitLock toolkitLock = mock(ToolkitLock.class); + + @Before + public void setUpClusteredStore() { + when(cache.getCacheConfiguration()).thenReturn(cacheConfiguration); + when(cache.getCacheManager()).thenReturn(cacheManager); + when(cache.getName()).thenReturn("ClusteredStoreTest-cache"); + when(cache.getStatus()).thenReturn(Status.STATUS_ALIVE); + when(cache.getCacheEventNotificationService()).thenReturn(new RegisteredEventListeners(cache, cacheStoreHelper)); + when(cacheManager.getName()).thenReturn("ClusteredStoreTest-cm"); + when(toolkitInstanceFactory.getOrCreateClusteredStoreConfigMap(eq("ClusteredStoreTest-cm"), eq("ClusteredStoreTest-cache"))).thenReturn(configMap); + when(toolkitInstanceFactory.getToolkit()).thenReturn(toolkitInternal); + when(toolkitInstanceFactory.getLockForCache(any(Ehcache.class), anyString())).thenReturn(toolkitLock); + when(toolkitInternal.getProperties()).thenReturn(toolkitProperties); + when(toolkitProperties.getBoolean(anyString())).thenReturn(false); + when(toolkitInstanceFactory.getOrCreateToolkitCache(cache, false)).thenReturn(toolkitCacheInternal); + when(toolkitCacheInternal.getConfiguration()).thenReturn(toolkitCacheConfiguration); + when(toolkitCacheConfiguration.getInt(anyString())).thenReturn(1); + when(toolkitInstanceFactory.getOrCreateConfigChangeNotifier(eq(cache))).thenReturn(toolkitNotifier); + clusteredStore = new ClusteredStore(toolkitInstanceFactory, cache, cacheCluster) { + @Override + void setUpWanConfig() { + // Do Nothing + } + }; + when(cacheStoreHelper.getStore()).thenReturn(clusteredStore); + } + + private static CacheCluster mockCacheCluster(String thisNode) { + CacheCluster cacheCluster = mock(CacheCluster.class); + ClusterNode node = when(mock(ClusterNode.class).getId()).thenReturn(thisNode).getMock(); + when(cacheCluster.getCurrentNode()).thenReturn(node); + return cacheCluster; + } +} Index: trunk/ehcache/terracotta/bootstrap/src/test/java/org/terracotta/modules/ehcache/ToolkitInstanceFactoryImplTest.java =================================================================== diff -u -N -r8446 -r8447 --- trunk/ehcache/terracotta/bootstrap/src/test/java/org/terracotta/modules/ehcache/ToolkitInstanceFactoryImplTest.java (.../ToolkitInstanceFactoryImplTest.java) (revision 8446) +++ trunk/ehcache/terracotta/bootstrap/src/test/java/org/terracotta/modules/ehcache/ToolkitInstanceFactoryImplTest.java (.../ToolkitInstanceFactoryImplTest.java) (revision 8447) @@ -75,19 +75,15 @@ private ToolkitInstanceFactoryImpl factory; private ToolkitCacheInternal resultantCache; + private ToolkitMap toolkitMap; @Before public void setUp() { MockitoAnnotations.initMocks(this); toolkit = mock(Toolkit.class); - ToolkitMap toolkitMap = mock(ToolkitMap.class); + toolkitMap = mockMap(); when(toolkit.getMap(anyString(), any(Class.class), any(Class.class))).thenReturn(toolkitMap); - ToolkitReadWriteLock rwLock = mock(ToolkitReadWriteLock.class); - when(toolkitMap.getReadWriteLock()).thenReturn(rwLock); - ToolkitLock lock = mock(ToolkitLock.class); - when(lock.tryLock()).thenReturn(true); - when(rwLock.readLock()).thenReturn(lock); - when(rwLock.writeLock()).thenReturn(lock); + ToolkitReadWriteLock rwLock = mockReadWriteLock(); when(toolkit.getReadWriteLock(any(String.class))).thenReturn(rwLock); makeToolkitReturnNonStopConfigurationRegistry(); @@ -111,6 +107,25 @@ factory.linkClusteredCacheManager(CACHE_MANAGER_NAME, null); } + private ToolkitMap mockMap() { + ToolkitMap map = mock(ToolkitMap.class); + ToolkitReadWriteLock readWriteLock = mockReadWriteLock(); + when(map.getReadWriteLock()).thenReturn(readWriteLock); + return map; + } + + private ToolkitReadWriteLock mockReadWriteLock() { + ToolkitReadWriteLock readWriteLock = mock(ToolkitReadWriteLock.class); + ToolkitLock lock = mockLock(); + when(readWriteLock.readLock()).thenReturn(lock); + when(readWriteLock.writeLock()).thenReturn(lock); + return readWriteLock; + } + + private ToolkitLock mockLock() { + return when(mock(ToolkitLock.class).tryLock()).thenReturn(true).getMock(); + } + @Test public void testGetOrCreateToolkitCacheForWanEnabled() throws Exception { whenCacheIsWanEnabled().callGetOrCreateToolkitCache(true).assertInstanceOfWanAwareToolkitCache(true); Index: trunk/ehcache/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/store/nonstop/NoOpOnTimeoutStore.java =================================================================== diff -u -N -r8351 -r8447 --- trunk/ehcache/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/store/nonstop/NoOpOnTimeoutStore.java (.../NoOpOnTimeoutStore.java) (revision 8351) +++ trunk/ehcache/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/store/nonstop/NoOpOnTimeoutStore.java (.../NoOpOnTimeoutStore.java) (revision 8447) @@ -33,7 +33,7 @@ import java.util.Set; /** - * Implementation of {@link NonstopStore} which returns null for all get operations and does nothing for puts and + * Implementation of {@link TerracottaStore} which returns null for all get operations and does nothing for puts and * removes. * * @author Abhishek Sanoujam @@ -528,4 +528,8 @@ throw new UnsupportedOperationException(); } + @Override + public void notifyCacheEventListenersChanged() { + // no-op + } } Index: trunk/ehcache/terracotta/bootstrap/src/test/java/org/terracotta/modules/ehcache/store/ClusteredStoreTest.java =================================================================== diff -u -N -r8446 -r8447 --- trunk/ehcache/terracotta/bootstrap/src/test/java/org/terracotta/modules/ehcache/store/ClusteredStoreTest.java (.../ClusteredStoreTest.java) (revision 8446) +++ trunk/ehcache/terracotta/bootstrap/src/test/java/org/terracotta/modules/ehcache/store/ClusteredStoreTest.java (.../ClusteredStoreTest.java) (revision 8447) @@ -8,77 +8,27 @@ */ package org.terracotta.modules.ehcache.store; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import net.sf.ehcache.CacheManager; import net.sf.ehcache.Ehcache; import net.sf.ehcache.Element; -import net.sf.ehcache.cluster.CacheCluster; -import net.sf.ehcache.config.CacheConfiguration; -import net.sf.ehcache.config.TerracottaConfiguration; +import net.sf.ehcache.cluster.ClusterTopologyListener; +import net.sf.ehcache.event.CacheEventListener; +import net.sf.ehcache.event.CacheEventListenerAdapter; import net.sf.ehcache.store.DefaultElementValueComparator; - -import org.junit.Before; import org.junit.Test; -import org.terracotta.modules.ehcache.ToolkitInstanceFactory; -import org.terracotta.toolkit.collections.ToolkitMap; -import org.terracotta.toolkit.events.ToolkitNotifier; -import org.terracotta.toolkit.internal.ToolkitInternal; -import org.terracotta.toolkit.internal.ToolkitProperties; -import org.terracotta.toolkit.internal.cache.ToolkitCacheInternal; +import org.terracotta.toolkit.cache.ToolkitCacheListener; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + /** * Test that asserts quickSize is not called when {@link Ehcache} sizing methods are called. * * @author Ludovic Orban */ -public class ClusteredStoreTest { - - private final ToolkitInstanceFactory toolkitInstanceFactory = mock(ToolkitInstanceFactory.class); - private final Ehcache cache = mock(Ehcache.class); - private final CacheCluster cacheCluster = mock(CacheCluster.class); - private final CacheConfiguration cacheConfiguration = new CacheConfiguration().terracotta(new TerracottaConfiguration().clustered(true).consistency(TerracottaConfiguration.Consistency.EVENTUAL)); - private final CacheManager cacheManager = mock(CacheManager.class); - private final ToolkitMap configMap = mock(ToolkitMap.class); - private final ToolkitInternal toolkitInternal = mock(ToolkitInternal.class); - private final ToolkitProperties toolkitProperties = mock(ToolkitProperties.class); - private final ToolkitCacheInternal toolkitCacheInternal = mock(ToolkitCacheInternal.class); - private final org.terracotta.toolkit.config.Configuration toolkitCacheConfiguration = mock(org.terracotta.toolkit.config.Configuration.class); - private final ToolkitNotifier toolkitNotifier = mock(ToolkitNotifier.class); - private ClusteredStore clusteredStore; - - - @Before - public void setUpClusteredStore() { - when(cache.getCacheConfiguration()).thenReturn(cacheConfiguration); - when(cache.getCacheManager()).thenReturn(cacheManager); - when(cache.getName()).thenReturn("ClusteredStoreTest-cache"); - when(cacheManager.getName()).thenReturn("ClusteredStoreTest-cm"); - when(toolkitInstanceFactory.getOrCreateClusteredStoreConfigMap(eq("ClusteredStoreTest-cm"), eq("ClusteredStoreTest-cache"))).thenReturn(configMap); - when(toolkitInstanceFactory.getToolkit()).thenReturn(toolkitInternal); - when(toolkitInternal.getProperties()).thenReturn(toolkitProperties); - when(toolkitProperties.getBoolean(anyString())).thenReturn(false); - when(toolkitInstanceFactory.getOrCreateToolkitCache(eq(cache), eq(false))).thenReturn(toolkitCacheInternal); - when(toolkitCacheInternal.getConfiguration()).thenReturn(toolkitCacheConfiguration); - when(toolkitCacheConfiguration.getInt(anyString())).thenReturn(1); - when(toolkitInstanceFactory.getOrCreateConfigChangeNotifier(eq(cache))).thenReturn(toolkitNotifier); - clusteredStore = new ClusteredStore(toolkitInstanceFactory, cache, cacheCluster) { - @Override - void setUpWanConfig() { - // Do Nothing - } - - @Override - boolean isWANEnabled() { - return false; - } - }; - } - +public class ClusteredStoreTest extends AbstractClusteredStoreTest { @Test public void clusteredStore_getSize_calls_size_not_quickSize() throws Exception { clusteredStore.getSize(); @@ -112,4 +62,34 @@ public void clusteredStore_removeElement_throw_in_eventual_consistency() { clusteredStore.removeElement(new Element("key", "value"), new DefaultElementValueComparator(cacheConfiguration)); } + + @Test + public void testDispose() throws Exception { + clusteredStore.dispose(); + verify(toolkitCacheInternal).disposeLocally(); + verify(cacheCluster).removeTopologyListener(any(ClusterTopologyListener.class)); + verify(toolkitCacheInternal).removeListener(any(ToolkitCacheListener.class)); + } + + @Test + public void testRegisterToolkitCacheEventListener() throws Exception { + verify(toolkitCacheInternal, never()).addListener(any(ToolkitCacheListener.class)); + cache.getCacheEventNotificationService().registerListener(new CacheEventListenerAdapter()); + cache.getCacheEventNotificationService().registerListener(new CacheEventListenerAdapter()); + verify(toolkitCacheInternal, times(1)).addListener(any(ToolkitCacheListener.class)); + } + + @Test + public void testUnregisterToolkitCacheEventListener() throws Exception { + String thisNodeId = cacheCluster.getCurrentNode().getId(); + when(configMap.get(ClusteredStore.LEADER_NODE_ID)).thenReturn(thisNodeId); // make this node the leader + verify(toolkitCacheInternal, never()).addListener(any(ToolkitCacheListener.class)); + CacheEventListener listener = new CacheEventListenerAdapter(); + cache.getCacheEventNotificationService().registerListener(listener); + cache.getCacheEventNotificationService().registerListener(listener); + cache.getCacheEventNotificationService().unregisterListener(listener); + cache.getCacheEventNotificationService().unregisterListener(listener); + verify(toolkitCacheInternal, times(1)).removeListener(any(ToolkitCacheListener.class)); + verify(configMap).remove(ClusteredStore.LEADER_NODE_ID); // make sure we drop leader status + } }