Index: branches/nonstop/ehcache-core/pom.xml
===================================================================
diff -u -N -r6565 -r6589
--- branches/nonstop/ehcache-core/pom.xml (.../pom.xml) (revision 6565)
+++ branches/nonstop/ehcache-core/pom.xml (.../pom.xml) (revision 6589)
@@ -26,7 +26,6 @@
ehcache
false
false
- ${maven.build.timestamp}
2.10
@@ -49,9 +48,10 @@
provided
- org.eclipse.jetty.orbit
- javax.servlet
- 3.0.0.v201112011016
+ javax.servlet
+ javax.servlet-api
+ 3.0.1
+ provided
org.hibernate
@@ -503,11 +503,6 @@
is used (for example git). Instead of getting the version build number from
svn we will use the build date and the user name. -->
buildnumber-git
-
-
- .svn
-
-
Index: branches/nonstop/ehcache-scheduled-refresh/pom.xml
===================================================================
diff -u -N -r6565 -r6589
--- branches/nonstop/ehcache-scheduled-refresh/pom.xml (.../pom.xml) (revision 6565)
+++ branches/nonstop/ehcache-scheduled-refresh/pom.xml (.../pom.xml) (revision 6589)
@@ -6,7 +6,7 @@
net.sf.ehcache
ehcache-root
2.7.0-SNAPSHOT
-
+ ..
ehcache-scheduled-refresh
Index: branches/nonstop/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/store/bulkload/BulkLoadToolkitCache.java
===================================================================
diff -u -N -r6558 -r6589
--- branches/nonstop/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/store/bulkload/BulkLoadToolkitCache.java (.../BulkLoadToolkitCache.java) (revision 6558)
+++ branches/nonstop/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/store/bulkload/BulkLoadToolkitCache.java (.../BulkLoadToolkitCache.java) (revision 6589)
@@ -13,7 +13,6 @@
import org.terracotta.toolkit.internal.ToolkitLogger;
import org.terracotta.toolkit.internal.cache.ToolkitCacheInternal;
import org.terracotta.toolkit.search.QueryBuilder;
-import org.terracotta.toolkit.search.SearchExecutor;
import org.terracotta.toolkit.search.attribute.ToolkitAttributeExtractor;
import org.terracotta.toolkit.store.ToolkitStoreConfigFields;
@@ -386,11 +385,6 @@
}
@Override
- public SearchExecutor createSearchExecutor() {
- return toolkitCache.createSearchExecutor();
- }
-
- @Override
public QueryBuilder createQueryBuilder() {
return toolkitCache.createQueryBuilder();
}
@@ -465,4 +459,10 @@
public void setAttributeExtractor(ToolkitAttributeExtractor extractor) {
toolkitCache.setAttributeExtractor(extractor);
}
+
+ @Override
+ public Map unlockedGetAll(Collection keys, boolean quiet) {
+ return quiet ? getAllQuiet(keys) : getAll(keys);
+ }
+
}
Index: branches/nonstop/pom.xml
===================================================================
diff -u -N -r6575 -r6589
--- branches/nonstop/pom.xml (.../pom.xml) (revision 6575)
+++ branches/nonstop/pom.xml (.../pom.xml) (revision 6589)
@@ -29,6 +29,29 @@
management-ehcache-impl
ehcache-scheduled-refresh
+
+
+
+
+ org.terracotta
+ maven-forge-plugin
+ ${maven-forge-plugin.version}
+
+
+ generateBuildInfoFile
+ generate-resources
+
+ buildinfo
+
+
+ true
+ ${project.build.outputDirectory}
+
+
+
+
+
+
@@ -50,6 +73,11 @@
devmode
+
+
+ ${user.home}/.devmode
+
+
@@ -65,6 +93,7 @@
+ println "================== devmode ===================="
def classesDir = new File(project.build.outputDirectory)
classesDir.mkdirs()
def targetFile = new File(classesDir, "devmode-classdir.txt")
Index: branches/nonstop/ehcache-core/src/main/java/net/sf/ehcache/search/impl/ResultsImpl.java
===================================================================
diff -u -N -r6558 -r6589
--- branches/nonstop/ehcache-core/src/main/java/net/sf/ehcache/search/impl/ResultsImpl.java (.../ResultsImpl.java) (revision 6558)
+++ branches/nonstop/ehcache-core/src/main/java/net/sf/ehcache/search/impl/ResultsImpl.java (.../ResultsImpl.java) (revision 6589)
@@ -45,7 +45,7 @@
* @param hasAttributes
* @param hasAggregators
*/
- public ResultsImpl(List results, boolean hasKeys, boolean hasValues, boolean hasAttributes, boolean hasAggregators) {
+ public ResultsImpl(List extends Result> results, boolean hasKeys, boolean hasValues, boolean hasAttributes, boolean hasAggregators) {
this.hasKeys = hasKeys;
this.hasValues = hasValues;
this.hasAttributes = hasAttributes;
Index: branches/nonstop/system-tests/src/test/java/org/terracotta/modules/ehcache/l1bm/L1BMOnHeapActivePassiveSanityTest.java
===================================================================
diff -u -N -r6558 -r6589
--- branches/nonstop/system-tests/src/test/java/org/terracotta/modules/ehcache/l1bm/L1BMOnHeapActivePassiveSanityTest.java (.../L1BMOnHeapActivePassiveSanityTest.java) (revision 6558)
+++ branches/nonstop/system-tests/src/test/java/org/terracotta/modules/ehcache/l1bm/L1BMOnHeapActivePassiveSanityTest.java (.../L1BMOnHeapActivePassiveSanityTest.java) (revision 6589)
@@ -12,7 +12,8 @@
public L1BMOnHeapActivePassiveSanityTest(TestConfig testConfig) {
super(testConfig, L1BMOnHeapBasicSanityTestApp.class, L1BMOnHeapBasicSanityTestApp.class);
- testConfig.getL2Config().setMaxHeap(512);
+ testConfig.getL2Config().setMaxHeap(1024);
+ testConfig.addTcProperty("ehcache.evictor.logging.enabled", "true");
testConfig.getCrashConfig().setCrashMode(ServerCrashMode.RANDOM_SERVER_CRASH);
testConfig.getCrashConfig().setServerCrashWaitTimeInSec(30);
}
Index: branches/nonstop/ehcache-scheduled-refresh/src/test/java/net/sf/ehcache/constructs/scheduledrefresh/IncrementingCacheLoader.java
===================================================================
diff -u -N -r6558 -r6589
--- branches/nonstop/ehcache-scheduled-refresh/src/test/java/net/sf/ehcache/constructs/scheduledrefresh/IncrementingCacheLoader.java (.../IncrementingCacheLoader.java) (revision 6558)
+++ branches/nonstop/ehcache-scheduled-refresh/src/test/java/net/sf/ehcache/constructs/scheduledrefresh/IncrementingCacheLoader.java (.../IncrementingCacheLoader.java) (revision 6589)
@@ -1,18 +1,19 @@
package net.sf.ehcache.constructs.scheduledrefresh;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
import net.sf.ehcache.CacheException;
import net.sf.ehcache.Ehcache;
import net.sf.ehcache.Status;
import net.sf.ehcache.loader.CacheLoader;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
class IncrementingCacheLoader implements CacheLoader {
private final int incrementalAmount;
private boolean matchEvens;
-
+ private long msDelay=0L;
+
public IncrementingCacheLoader(boolean matchEvens, int inc) {
this.incrementalAmount = inc;
this.matchEvens = matchEvens;
@@ -23,6 +24,14 @@
return loadAll(keys);
}
+ public long getMsDelay() {
+ return msDelay;
+ }
+
+ public void setMsDelay(long msDelay) {
+ this.msDelay = msDelay;
+ }
+
@Override
public Map loadAll(Collection keys) {
@@ -43,6 +52,12 @@
@Override
public Object load(Object key) throws CacheException {
+ if(getMsDelay()>0L) {
+ try {
+ Thread.sleep(getMsDelay());
+ } catch (InterruptedException e) {
+ }
+ }
if (key instanceof Number) {
int ivalue = ((Number) key).intValue();
if ((((ivalue & 0x01) != 0) && !matchEvens) || ((ivalue & 0x01) == 0) && matchEvens) {
Index: branches/nonstop/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/store/ClusteredStoreBackend.java
===================================================================
diff -u -N -r6558 -r6589
--- branches/nonstop/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/store/ClusteredStoreBackend.java (.../ClusteredStoreBackend.java) (revision 6558)
+++ branches/nonstop/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/store/ClusteredStoreBackend.java (.../ClusteredStoreBackend.java) (revision 6589)
@@ -12,7 +12,6 @@
import org.terracotta.toolkit.internal.ToolkitInternal;
import org.terracotta.toolkit.internal.cache.ToolkitCacheInternal;
import org.terracotta.toolkit.search.QueryBuilder;
-import org.terracotta.toolkit.search.SearchExecutor;
import org.terracotta.toolkit.search.attribute.ToolkitAttributeExtractor;
import java.io.Serializable;
@@ -50,16 +49,6 @@
}
@Override
- public SearchExecutor createSearchExecutor() {
- lock.readLock().lock();
- try {
- return activeDelegate.createSearchExecutor();
- } finally {
- lock.readLock().unlock();
- }
- }
-
- @Override
public V put(K key, V value, int createTimeInSecs, int customMaxTTISeconds, int customMaxTTLSeconds) {
lock.readLock().lock();
try {
@@ -610,5 +599,14 @@
}
}
+ @Override
+ public Map unlockedGetAll(Collection keys, boolean quiet) {
+ lock.readLock().lock();
+ try {
+ return activeDelegate.unlockedGetAll(keys, quiet);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
}
Index: branches/nonstop/management-ehcache/pom.xml
===================================================================
diff -u -N -r6575 -r6589
--- branches/nonstop/management-ehcache/pom.xml (.../pom.xml) (revision 6575)
+++ branches/nonstop/management-ehcache/pom.xml (.../pom.xml) (revision 6589)
@@ -6,6 +6,7 @@
net.sf.ehcache
ehcache-root
2.7.0-SNAPSHOT
+ ..
net.sf.ehcache
Index: branches/nonstop/management-ehcache-impl/pom.xml
===================================================================
diff -u -N -r6575 -r6589
--- branches/nonstop/management-ehcache-impl/pom.xml (.../pom.xml) (revision 6575)
+++ branches/nonstop/management-ehcache-impl/pom.xml (.../pom.xml) (revision 6589)
@@ -6,6 +6,7 @@
net.sf.ehcache
ehcache-root
2.7.0-SNAPSHOT
+ ..
net.sf.ehcache
Index: branches/nonstop/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/collections/SerializedToolkitCache.java
===================================================================
diff -u -N -r6558 -r6589
--- branches/nonstop/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/collections/SerializedToolkitCache.java (.../SerializedToolkitCache.java) (revision 6558)
+++ branches/nonstop/terracotta/bootstrap/src/main/java/org/terracotta/modules/ehcache/collections/SerializedToolkitCache.java (.../SerializedToolkitCache.java) (revision 6589)
@@ -14,7 +14,6 @@
import org.terracotta.toolkit.concurrent.locks.ToolkitReadWriteLock;
import org.terracotta.toolkit.config.Configuration;
import org.terracotta.toolkit.search.QueryBuilder;
-import org.terracotta.toolkit.search.SearchExecutor;
import org.terracotta.toolkit.search.attribute.ToolkitAttributeExtractor;
import java.io.IOException;
@@ -499,8 +498,4 @@
return this.toolkitCache.createQueryBuilder();
}
- @Override
- public SearchExecutor createSearchExecutor() {
- return this.toolkitCache.createSearchExecutor();
- }
}
Index: branches/nonstop/system-tests/src/test/resources/failover-during-passive-sync-test.xml
===================================================================
diff -u -N -r6558 -r6589
--- branches/nonstop/system-tests/src/test/resources/failover-during-passive-sync-test.xml (.../failover-during-passive-sync-test.xml) (revision 6558)
+++ branches/nonstop/system-tests/src/test/resources/failover-during-passive-sync-test.xml (.../failover-during-passive-sync-test.xml) (revision 6589)
@@ -7,7 +7,7 @@
eternal="true"/>
Index: branches/nonstop/ehcache/pom.xml
===================================================================
diff -u -N -r6575 -r6589
--- branches/nonstop/ehcache/pom.xml (.../pom.xml) (revision 6575)
+++ branches/nonstop/ehcache/pom.xml (.../pom.xml) (revision 6589)
@@ -6,6 +6,7 @@
net.sf.ehcache
ehcache-root
2.7.0-SNAPSHOT
+ ..
ehcache
Index: branches/nonstop/ehcache-scheduled-refresh/src/test/java/net/sf/ehcache/constructs/scheduledrefresh/ScheduledRefreshCacheExtensionTest.java
===================================================================
diff -u -N -r6558 -r6589
--- branches/nonstop/ehcache-scheduled-refresh/src/test/java/net/sf/ehcache/constructs/scheduledrefresh/ScheduledRefreshCacheExtensionTest.java (.../ScheduledRefreshCacheExtensionTest.java) (revision 6558)
+++ branches/nonstop/ehcache-scheduled-refresh/src/test/java/net/sf/ehcache/constructs/scheduledrefresh/ScheduledRefreshCacheExtensionTest.java (.../ScheduledRefreshCacheExtensionTest.java) (revision 6589)
@@ -1,21 +1,21 @@
package net.sf.ehcache.constructs.scheduledrefresh;
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+
import junit.framework.Assert;
import net.sf.ehcache.Cache;
import net.sf.ehcache.CacheManager;
import net.sf.ehcache.Ehcache;
import net.sf.ehcache.Element;
import net.sf.ehcache.config.CacheConfiguration;
-import net.sf.ehcache.loader.CacheLoader;
+
import org.junit.Test;
-import java.util.Calendar;
-import java.util.GregorianCalendar;
-
public class ScheduledRefreshCacheExtensionTest {
- private static CacheLoader stupidCacheLoaderOdds = new OddCacheLoader();
- private static CacheLoader stupidCacheLoaderEvens = new EvenCacheLoader();
+ private static OddCacheLoader stupidCacheLoaderOdds = new OddCacheLoader();
+ private static EvenCacheLoader stupidCacheLoaderEvens = new EvenCacheLoader();
private static void sleepySeconds(int secs) {
sleepy(secs * 1000);
@@ -109,4 +109,41 @@
manager.shutdown();
}
+
+ // OK. we want to create an ehcache, then programmitically decorate it with
+ // locks.
+ @Test
+ public void testPolling() {
+
+ CacheManager manager = new CacheManager();
+ manager.removalAll();
+
+ manager.addCache(new Cache(new CacheConfiguration().name("tt").eternal(true).maxEntriesLocalHeap(5000).overflowToDisk(false)));
+ Ehcache cache = manager.getEhcache("tt");
+ stupidCacheLoaderEvens.setMsDelay(100);
+ cache.registerCacheLoader(stupidCacheLoaderEvens);
+ cache.registerCacheLoader(stupidCacheLoaderOdds);
+
+ int second = (new GregorianCalendar().get(Calendar.SECOND) + 5) % 60;
+ ScheduledRefreshConfiguration config = new ScheduledRefreshConfiguration().batchSize(2).quartzThreadCount
+ (2).pollTimeMs(100).cronExpression(second + "/1 * * * * ?").build();
+ ScheduledRefreshCacheExtension cacheExtension = new ScheduledRefreshCacheExtension(config, cache);
+ cache.registerCacheExtension(cacheExtension);
+ cacheExtension.init();
+
+ final int ELEMENT_COUNT=50;
+ long[] orig=new long[ELEMENT_COUNT];
+ for (int i = 0; i < ELEMENT_COUNT; i++) {
+ Element elem = new Element(new Integer(i), i + "");
+ orig[i]=elem.getCreationTime();
+ cache.put(elem);
+ }
+
+ sleepySeconds(20);
+
+ //cacheExtension.dispose();
+ manager.removalAll();
+ manager.shutdown();
+ }
+
}
\ No newline at end of file
Index: branches/nonstop/ehcache-scheduled-refresh/src/main/java/net/sf/ehcache/constructs/scheduledrefresh/OverseerJob.java
===================================================================
diff -u -N -r6575 -r6589
--- branches/nonstop/ehcache-scheduled-refresh/src/main/java/net/sf/ehcache/constructs/scheduledrefresh/OverseerJob.java (.../OverseerJob.java) (revision 6575)
+++ branches/nonstop/ehcache-scheduled-refresh/src/main/java/net/sf/ehcache/constructs/scheduledrefresh/OverseerJob.java (.../OverseerJob.java) (revision 6589)
@@ -24,6 +24,7 @@
import net.sf.ehcache.CacheManager;
import net.sf.ehcache.Ehcache;
+import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
@@ -46,153 +47,161 @@
* other jobs run, and is responsible for starting all the individual refresh
* jobs, enabling bulk load mode beforehand, and disabling bulk load mode
* afterwards.
- *
+ *
* @author cschanck
*/
+@DisallowConcurrentExecution
public class OverseerJob implements Job {
- private static final Logger LOG = LoggerFactory.getLogger(OverseerJob.class);
+ private static final Logger LOG = LoggerFactory.getLogger(OverseerJob.class);
- private static final AtomicLong INSTANCE_ID_GENERATOR = new AtomicLong(0);
+ private static final AtomicLong INSTANCE_ID_GENERATOR = new AtomicLong(0);
- @Override
- public void execute(JobExecutionContext context) throws JobExecutionException {
+ @Override
+ public void execute(JobExecutionContext context) throws JobExecutionException {
- JobDataMap jdm = context.getMergedJobDataMap();
- ScheduledRefreshConfiguration config = (ScheduledRefreshConfiguration) jdm
- .get(ScheduledRefreshCacheExtension.PROP_CONFIG_OBJECT);
- String cacheManagerName = jdm.getString(ScheduledRefreshCacheExtension.PROP_CACHE_MGR_NAME);
- String cacheName = jdm.getString(ScheduledRefreshCacheExtension.PROP_CACHE_NAME);
+ try {
+ JobDataMap jdm = context.getMergedJobDataMap();
+ ScheduledRefreshConfiguration config = (ScheduledRefreshConfiguration) jdm
+ .get(ScheduledRefreshCacheExtension.PROP_CONFIG_OBJECT);
+ String cacheManagerName = jdm.getString(ScheduledRefreshCacheExtension.PROP_CACHE_MGR_NAME);
+ String cacheName = jdm.getString(ScheduledRefreshCacheExtension.PROP_CACHE_NAME);
- final CacheManager cacheManager = CacheManager.getCacheManager(cacheManagerName);
+ final CacheManager cacheManager = CacheManager.getCacheManager(cacheManagerName);
- if (cacheManager == null) {
+ if (cacheManager == null) {
LOG.warn("Unable to process Scheduled Refresh batch" + context.getJobDetail().getKey() + ": cache "
- + "manager " + cacheManager + " not found");
+ + "manager " + cacheManager + " not found");
return;
- }
+ }
- final Ehcache cache = cacheManager.getEhcache(cacheName);
- if (cache == null) {
+ final Ehcache cache = cacheManager.getEhcache(cacheName);
+ if (cache == null) {
LOG.warn("Unable to process Scheduled Refresh batch" + context.getJobDetail().getKey() + ": cache "
- + cacheName + " not found");
+ + cacheName + " not found");
return;
- }
+ }
- ScheduledRefreshKeyGenerator generator = makeGeneratorObject(config.getKeyGeneratorClass());
- if (generator != null) {
+ ScheduledRefreshKeyGenerator generator = makeGeneratorObject(config.getKeyGeneratorClass());
+ if (generator != null) {
Scheduler scheduler = context.getScheduler();
- try {
- if (getOutstandingJobCount(context, scheduler) == 1) {
- LOG.info("Starting Scheduled refresh: " + config.toString(cache));
- processKeys(context, config, cache, generator);
- if (config.isUseBulkload()) {
- try {
- waitForOutstandingJobCount(context, config, scheduler, 0);
- } catch (SchedulerException e) {
- LOG.warn("Unable to process Scheduled Refresh batch termination"
- + context.getJobDetail().getKey(), e);
- }
- }
- } else {
- LOG.info("Skipping overlapping execution for Scheduled Refresh batch "
- + context.getJobDetail().getKey());
- }
- } catch (SchedulerException e) {
- try {
- if (!scheduler.isShutdown()) {
- LOG.warn("Unable to process Scheduled Refresh batch " + context.getJobDetail().getKey(), e);
- }
- } catch (SchedulerException e1) {
- LOG.warn(e1.getMessage(), e1);
- }
+ if (getOutstandingJobCount(context, scheduler) == 1) {
+ LOG.info("Starting Scheduled refresh: " + config.toString(cache));
+ processKeys(context, config, cache, generator);
+ if (config.isUseBulkload()) {
+ try {
+ waitForOutstandingJobCount(context, config, scheduler, 0);
+ } catch (SchedulerException e) {
+ LOG.warn(
+ "Unable to process Scheduled Refresh batch termination" + context.getJobDetail().getKey(), e);
+ }
+ }
+ } else {
+ LOG.info("Skipping overlapping execution for Scheduled Refresh batch " + context.getJobDetail().getKey());
}
- }
- }
- private int getOutstandingJobCount(JobExecutionContext context, Scheduler scheduler) throws SchedulerException {
- GroupMatcher matcher = GroupMatcher.jobGroupEquals(context.getJobDetail().getKey().getGroup());
- Set queuedKeys = scheduler.getJobKeys(matcher);
- return queuedKeys.size();
- }
-
- private void waitForOutstandingJobCount(JobExecutionContext context, ScheduledRefreshConfiguration config,
- Scheduler scheduler, int minCount) throws SchedulerException {
- GroupMatcher matcher = GroupMatcher.jobGroupEquals(context.getJobDetail().getKey().getGroup());
- for (Set queuedKeys = scheduler.getJobKeys(matcher); (!scheduler.isShutdown())
- && (queuedKeys.size() > minCount); queuedKeys = scheduler.getJobKeys(matcher)) {
- try {
- Thread.sleep(config.getPollTimeMs());
- } catch (InterruptedException e) {
+ }
+ } catch (SchedulerException e) {
+ try {
+ if (!context.getScheduler().isShutdown()) {
+ LOG.warn("Unable to process Scheduled Refresh batch " + context.getJobDetail().getKey(), e);
+ throw e;
}
- }
- }
+ } catch (SchedulerException e1) {
+ LOG.warn(e1.getMessage(), e1);
+ }
+ }
+ }
- private void processKeys(JobExecutionContext context, ScheduledRefreshConfiguration config, final Ehcache cache,
- ScheduledRefreshKeyGenerator generator) throws JobExecutionException {
- ArrayList batch = new ArrayList(config.getBatchSize());
- for (Serializable key : generator.generateKeys(cache)) {
- batch.add(key);
- if (batch.size() >= config.getBatchSize()) {
- try {
- process(context, cache, config, batch);
- batch = new ArrayList();
- } catch (SchedulerException e) {
- LOG.warn("Unable to process Scheduled Refresh batch" + context.getJobDetail().getKey(), e);
- throw new JobExecutionException(e);
- }
- batch.clear();
- }
- }
- if (!batch.isEmpty()) {
+ private int getOutstandingJobCount(JobExecutionContext context, Scheduler scheduler) throws SchedulerException {
+ GroupMatcher matcher = GroupMatcher.jobGroupEquals(context.getJobDetail().getKey().getGroup());
+ Set queuedKeys = scheduler.getJobKeys(matcher);
+ return queuedKeys.size();
+ }
+
+ private void waitForOutstandingJobCount(JobExecutionContext context, ScheduledRefreshConfiguration config,
+ Scheduler scheduler, int minCount) throws SchedulerException {
+ GroupMatcher matcher = GroupMatcher.jobGroupEquals(context.getJobDetail().getKey().getGroup());
+ for (Set queuedKeys = scheduler.getJobKeys(matcher); (!scheduler.isShutdown())
+ && (queuedKeys.size() > minCount); queuedKeys = scheduler.getJobKeys(matcher)) {
+ try {
+ Thread.sleep(config.getPollTimeMs());
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ private void processKeys(JobExecutionContext context, ScheduledRefreshConfiguration config, final Ehcache cache,
+ ScheduledRefreshKeyGenerator generator) throws JobExecutionException {
+ ArrayList batch = new ArrayList(config.getBatchSize());
+ for (Serializable key : generator.generateKeys(cache)) {
+ batch.add(key);
+ if (batch.size() >= config.getBatchSize()) {
try {
- process(context, cache, config, batch);
+ process(context, cache, config, batch);
+ batch = new ArrayList();
} catch (SchedulerException e) {
- LOG.warn("Unable to process Scheduled Refresh batch" + context.getJobDetail().getKey(), e);
- throw new JobExecutionException(e);
+ LOG.warn("Unable to process Scheduled Refresh batch" + context.getJobDetail().getKey(), e);
+ throw new JobExecutionException(e);
}
- }
- }
+ batch.clear();
+ }
+ }
+ if (!batch.isEmpty()) {
+ try {
+ process(context, cache, config, batch);
+ } catch (SchedulerException e) {
+ LOG.warn("Unable to process Scheduled Refresh batch" + context.getJobDetail().getKey(), e);
+ throw new JobExecutionException(e);
+ }
+ }
+ }
- private void process(JobExecutionContext context, Ehcache underlyingCache, ScheduledRefreshConfiguration config,
- List batch) throws SchedulerException {
+ private void process(JobExecutionContext context, Ehcache underlyingCache, ScheduledRefreshConfiguration config,
+ List batch) throws SchedulerException {
- JobDataMap map = new JobDataMap(context.getJobDetail().getJobDataMap());
+ JobDataMap map = new JobDataMap(context.getJobDetail().getJobDataMap());
- map.put(ScheduledRefreshCacheExtension.PROP_KEYS_TO_PROCESS, batch);
+ map.put(ScheduledRefreshCacheExtension.PROP_KEYS_TO_PROCESS, batch);
- Scheduler scheduler = context.getScheduler();
+ Scheduler scheduler = context.getScheduler();
- JobDetail job = JobBuilder
- .newJob(RefreshBatchJob.class)
- .withIdentity("batch_" + INSTANCE_ID_GENERATOR.incrementAndGet(),
- context.getTrigger().getJobKey().getGroup()).usingJobData(map).build();
+ JobDetail job = JobBuilder
+ .newJob(RefreshBatchJob.class)
+ .withIdentity("batch_" + INSTANCE_ID_GENERATOR.incrementAndGet(),
+ context.getTrigger().getJobKey().getGroup()).usingJobData(map).build();
- waitForOutstandingJobCount(context, config, scheduler, config.getQuartzThreadCount());
+ try {
+ waitForOutstandingJobCount(context, config, scheduler, config.getQuartzThreadCount());
- if (!scheduler.isShutdown()) {
+ if (!scheduler.isShutdown()) {
Trigger trigger = TriggerBuilder.newTrigger().startNow().forJob(job).build();
scheduler.addJob(job, true);
scheduler.scheduleJob(trigger);
- }
- }
+ }
+ } catch (SchedulerException e) {
+ if (!scheduler.isShutdown()) {
+ throw e;
+ }
+ }
+ }
- private ScheduledRefreshKeyGenerator makeGeneratorObject(String keyGeneratorClass) {
- try {
- Class> gen = Class.forName(keyGeneratorClass);
- @SuppressWarnings("unchecked")
- ScheduledRefreshKeyGenerator obj = (ScheduledRefreshKeyGenerator) gen
- .newInstance();
- return obj;
- } catch (ClassNotFoundException e) {
- LOG.warn("Unable to instantiate key generator class: " + keyGeneratorClass, e);
- } catch (InstantiationException e) {
- LOG.warn("Unable to instantiate key generator class: " + keyGeneratorClass, e);
- } catch (IllegalAccessException e) {
- LOG.warn("Unable to instantiate key generator class: " + keyGeneratorClass, e);
- }
- return null;
+ private ScheduledRefreshKeyGenerator makeGeneratorObject(String keyGeneratorClass) {
+ try {
+ Class> gen = Class.forName(keyGeneratorClass);
+ @SuppressWarnings("unchecked")
+ ScheduledRefreshKeyGenerator obj = (ScheduledRefreshKeyGenerator) gen
+ .newInstance();
+ return obj;
+ } catch (ClassNotFoundException e) {
+ LOG.warn("Unable to instantiate key generator class: " + keyGeneratorClass, e);
+ } catch (InstantiationException e) {
+ LOG.warn("Unable to instantiate key generator class: " + keyGeneratorClass, e);
+ } catch (IllegalAccessException e) {
+ LOG.warn("Unable to instantiate key generator class: " + keyGeneratorClass, e);
+ }
+ return null;
- }
+ }
}