Multi-Threads: SyncBlock a replacement of the synchronized keyword
To prevent Java deadlocks a drop in replacement class SyncBlock is intended as a drop in replacement of the keyword synchronized
When you call the method take() on a SyncBlock object you'll block up until the specified maximum number of seconds and then an error will be thrown if you are unable to obtain the lock on this object. You must ALWAYS call release() on any sync SyncBlock that you have obtained the lock on.
The SyncBlock differs from the keyword synchronized in that it is interruptible and that it will timeout if it blocks for too long.
The SyncBlock enhances a normal java.lang.concurrent.Lock in that it will interrupt the blocking thread if it holds the lock for too long and if the thread holding the lock is not alive a new lock object will be created and a fatal email will be generated with the details. If a lock fails to be obtained ( which would have been a deadlock) a fatal email will be generated with the details of the two threads and the blocking thread will be interrupted and the calling thread will have an error thrown.
Please see below an example of how to use, the associated test cases and the code listing itself.
com/aspc/DBObj/VirtualDB.java |
6686 /** 6687 * notify all the class listeners 6688 * @param type the type of change MODIFY,DELETE or CREATE 6689 * @param gk the global key to notify of. 6690 */ 6691 private void notifyDBClassListeners( final String type, final GlobalKey gk) 6692 { 6693 String key = gk.getClassId().toString(); 6694 6695 /** 6696 * DEADLOCK found when this was a synchronized block. 6697 * now we are using a SyncBlock lock object which will timeout after 2 minutes if not successful. 6698 * 6699 * Once you have taken the lock the next statement must be the start of the try block so we never leave this 6700 * section without releasing the lock. 6701 */ 6702 dbClassListenersLock.take(); 6703 try 6704 { 6705 ArrayList list = (ArrayList)dbClassListeners.get( key); 6706 6707 if( list != null) 6708 { 6709 for( int i = 0; i < list.size(); i++) 6710 { 6711 DBClassListener listener; 6712 6713 listener = (DBClassListener)list.get( i); 6714 6715 if( type.equals( DBData.NOTIFY_MODIFIED)) 6716 { 6717 listener.eventObjectModified( gk, this); 6718 } 6719 else if( type.equals( DBData.NOTIFY_DELETED)) 6720 { 6721 listener.eventObjectDeleted( gk, this); 6722 } 6723 else if( type.equals( DBData.NOTIFY_CREATED)) 6724 { 6725 listener.eventObjectCreated( gk, this); 6726 } 6727 else 6728 { 6729 LOGGER.error( "Wrong type:" + type); 6730 } 6731 } 6732 } 6733 } 6734 catch( Throwable t) 6735 { 6736 LOGGER.warn( "ignored exception in listener", t); // Sr, 12/05/2005 Bug #5224 6737 } 6738 finally 6739 { 6740 /** 6741 * Always release the lock if obtained. 6742 */ 6743 dbClassListenersLock.release(); 6744 } 6745 }
com/aspc/remote/util/misc/SyncBlock.java |
107 /** 108 * release the lock 109 */ 110 public void release() 111 { 112 syncLock.unlock(); 113 } 114 115 /** 116 * take the lock and throw an error if you can't get it. 117 */ 118 public void take() 119 { 120 try 121 { 122 SyncLock tempLock = syncLock; 123 if (syncLock.tryLock(blockSeconds, TimeUnit.SECONDS) == false) 124 { 125 Thread ownerThread = syncLock.getOwner(); 126 127 if( ownerThread.isAlive() == false) 128 { 129 synchronized( this) 130 { 131 if( tempLock == syncLock) 132 { 133 syncLock = new SyncLock(); 134 } 135 } 136 137 LOGGER.fatal(this + " never released by " + ownerThread); 138 take(); 139 return; 140 } 141 142 StringBuilder sb = new StringBuilder( toString()); 143 sb.append("\n"); 144 Thread currentThread = Thread.currentThread(); 145 146 sb.append("Failed to get lock for thread: " + currentThread + "\n"); 147 148 for (StackTraceElement ste : currentThread.getStackTrace()) 149 { 150 sb.append("\t" + ste + "\n"); 151 } 152 153 if( ownerThread != null) 154 { 155 sb.append("\nLock held by thread: " + ownerThread + "\n"); 156 157 for (StackTraceElement ste : ownerThread.getStackTrace()) 158 { 159 sb.append("\t" + ste + "\n"); 160 } 161 sb.append("Interrupting holding thread"); 162 ownerThread.interrupt(); 163 } 164 else 165 { 166 sb.append("NO OWNER THREAD found"); 167 } 168 169 LOGGER.fatal(sb.toString()); 170 171 throw new DataBaseError("could not get the lock on: " + name); 172 } 173 } 174 catch (InterruptedException ex) 175 { 176 Thread.interrupted(); 177 LOGGER.warn( "could not take lock on " + name, ex); 178 Thread.currentThread().interrupt(); 179 throw new DataBaseError("could not get the lock on: " + name, ex); 180 } 181 } 182 183 class SyncLock extends ReentrantLock 184 { 185 public SyncLock( ) 186 { 187 super( true); 188 } 189 /** 190 * get the owner thread 191 * @return the owner thread 192 */ 193 @Override 194 public Thread getOwner()//NOPMD 195 { 196 return super.getOwner(); 197 } 198 }
com/aspc/remote/util/misc/selftest/TestSyncBlock.java |
88 89 /** 90 * check we recover from a lock that is never released. 91 */ 92 public void testNeverReleased() throws InterruptedException 93 { 94 final SyncBlock block = new SyncBlock( "never release", 2); 95 96 Runnable r = new Runnable( ) 97 { 98 public void run() 99 { 100 block.take(); 101 } 102 }; 103 Thread t = new Thread( r); 104 t.start(); 105 106 t.join( 120000); 107 108 block.take(); 109 } 110 111 /** 112 * check that we actually do block 113 */ 114 @SuppressWarnings("empty-statement") 115 public void testBlock() throws InterruptedException 116 { 117 final SyncBlock block = new SyncBlock( "long time", 10); 118 119 Runnable r = new Runnable( ) 120 { 121 public void run() 122 { 123 block.take(); 124 try 125 { 126 Thread.sleep(120000); 127 } 128 catch (InterruptedException ex) 129 { 130 LOGGER.warn("interrupted"); 131 } 132 finally 133 { 134 block.release(); 135 } 136 } 137 }; 138 Thread t = new Thread( r); 139 t.start(); 140 141 t.join( 1000); 142 143 try 144 { 145 block.take(); 146 fail( "should not succeed"); 147 } 148 catch( Throwable tw) 149 { 150 ;// this is good 151 } 152 t.interrupt(); 153 154 t.join( 5000); 155 block.take(); 156 } 157 158 159 /** 160 * check that deadlocks are handled 161 * @throws Exception a test failure 162 */ 163 public void testDeadlockHandled() throws Exception 164 { 165 a=new A(); 166 b=new B(); 167 Thread at = new Thread( a); 168 169 at.start(); 170 Thread bt = new Thread( b); 171 172 bt.start(); 173 174 long start = System.currentTimeMillis(); 175 while( start + 120000 > System.currentTimeMillis()) 176 { 177 if( a.calling && b.calling ) break; 178 Thread.sleep(100); 179 } 180 181 synchronized( marker) 182 { 183 marker.notifyAll(); 184 } 185 LOGGER.info("waiting for detection"); 186 at.join(240000); 187 bt.join(240000); 188 189 if( a.theException == null && b.theException == null) 190 { 191 fail( "The threads were not interrupted"); 192 } 193 194 assertFalse( "should have finished", at.isAlive()); 195 assertFalse( "should have finished", bt.isAlive()); 196 } 197 198 class A implements Runnable 199 { 200 private final SyncBlock block = new SyncBlock("A block", 2); 201 boolean calling; 202 Throwable theException; 203 204 public void run() 205 { 206 try 207 { 208 callB(); 209 } 210 catch( Throwable e) 211 { 212 theException = e; 213 LOGGER.warn( "got cancelled", e); 214 } 215 } 216 217 public void hello() 218 { 219 block.take(); 220 try 221 { 222 LOGGER.info("hello A"); 223 } 224 finally 225 { 226 block.release(); 227 } 228 } 229 230 private void callB() throws InterruptedException 231 { 232 block.take(); 233 try 234 { 235 calling=true; 236 synchronized( marker) 237 { 238 marker.wait(120000); 239 } 240 LOGGER.info("call B"); 241 b.hello(); 242 } 243 finally 244 { 245 block.release(); 246 } 247 } 248 } 249 250 class B implements Runnable 251 { 252 boolean calling; 253 Throwable theException; 254 private final SyncBlock block = new SyncBlock("A block", 2); 255 256 public void run() 257 { 258 try 259 { 260 callA(); 261 } 262 catch( Throwable e) 263 { 264 theException = e; 265 LOGGER.warn( "got cancelled", e); 266 } 267 } 268 269 public void hello() 270 { 271 block.take(); 272 try 273 { 274 LOGGER.info("hello B"); 275 } 276 finally 277 { 278 block.release(); 279 } 280 } 281 282 private void callA() throws InterruptedException 283 { 284 block.take(); 285 try 286 { 287 calling=true; 288 synchronized( marker) 289 { 290 marker.wait(120000); 291 } 292 LOGGER.info("call A"); 293 a.hello(); 294 } 295 finally 296 { 297 block.release(); 298 } 299 } 300 } 301 302 private A a; 303 private B b;