001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.util; 018 019 import java.io.IOException; 020 import java.sql.SQLException; 021 import java.util.concurrent.TimeUnit; 022 import java.util.concurrent.atomic.AtomicBoolean; 023 024 import org.apache.activemq.broker.BrokerService; 025 import org.slf4j.Logger; 026 import org.slf4j.LoggerFactory; 027 028 /** 029 * @org.apache.xbean.XBean 030 */ 031 public class DefaultIOExceptionHandler implements IOExceptionHandler { 032 033 private static final Logger LOG = LoggerFactory 034 .getLogger(DefaultIOExceptionHandler.class); 035 private BrokerService broker; 036 private boolean ignoreAllErrors = false; 037 private boolean ignoreNoSpaceErrors = true; 038 private boolean ignoreSQLExceptions = true; 039 private boolean stopStartConnectors = false; 040 private String noSpaceMessage = "space"; 041 private String sqlExceptionMessage = ""; // match all 042 private long resumeCheckSleepPeriod = 5*1000; 043 private AtomicBoolean stopStartInProgress = new AtomicBoolean(false); 044 045 public void handle(IOException exception) { 046 if (ignoreAllErrors) { 047 LOG.info("Ignoring IO exception, " + exception, exception); 048 return; 049 } 050 051 if (ignoreNoSpaceErrors) { 052 Throwable cause = exception; 053 while (cause != null && cause instanceof IOException) { 054 if (cause.getMessage().contains(noSpaceMessage)) { 055 LOG.info("Ignoring no space left exception, " + exception, exception); 056 return; 057 } 058 cause = cause.getCause(); 059 } 060 } 061 062 if (ignoreSQLExceptions) { 063 Throwable cause = exception; 064 while (cause != null) { 065 if (cause instanceof SQLException && cause.getMessage().contains(sqlExceptionMessage)) { 066 LOG.info("Ignoring SQLException, " + exception, cause); 067 return; 068 } 069 cause = cause.getCause(); 070 } 071 } 072 073 if (stopStartConnectors) { 074 if (!stopStartInProgress.compareAndSet(false, true)) { 075 // we are already working on it 076 return; 077 } 078 LOG.info("Initiating stop/restart of broker transport due to IO exception, " + exception, exception); 079 080 new Thread("stop transport connectors on IO exception") { 081 public void run() { 082 try { 083 ServiceStopper stopper = new ServiceStopper(); 084 broker.stopAllConnectors(stopper); 085 } catch (Exception e) { 086 LOG.warn("Failure occurred while stopping broker connectors", e); 087 } 088 } 089 }.start(); 090 091 // resume again 092 new Thread("restart transport connectors post IO exception") { 093 public void run() { 094 try { 095 while (isPersistenceAdapterDown()) { 096 LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports"); 097 TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod); 098 } 099 broker.startAllConnectors(); 100 } catch (Exception e) { 101 LOG.warn("Failure occurred while restarting broker connectors", e); 102 } finally { 103 stopStartInProgress.compareAndSet(true, false); 104 } 105 } 106 107 private boolean isPersistenceAdapterDown() { 108 boolean checkpointSuccess = false; 109 try { 110 broker.getPersistenceAdapter().checkpoint(true); 111 checkpointSuccess = true; 112 } catch (Throwable ignored) {} 113 return !checkpointSuccess; 114 } 115 }.start(); 116 117 return; 118 } 119 120 LOG.info("Stopping the broker due to IO exception, " + exception, exception); 121 new Thread("Stopping the broker due to IO exception") { 122 public void run() { 123 try { 124 broker.stop(); 125 } catch (Exception e) { 126 LOG.warn("Failure occurred while stopping broker", e); 127 } 128 } 129 }.start(); 130 } 131 132 public void setBrokerService(BrokerService broker) { 133 this.broker = broker; 134 } 135 136 public boolean isIgnoreAllErrors() { 137 return ignoreAllErrors; 138 } 139 140 public void setIgnoreAllErrors(boolean ignoreAllErrors) { 141 this.ignoreAllErrors = ignoreAllErrors; 142 } 143 144 public boolean isIgnoreNoSpaceErrors() { 145 return ignoreNoSpaceErrors; 146 } 147 148 public void setIgnoreNoSpaceErrors(boolean ignoreNoSpaceErrors) { 149 this.ignoreNoSpaceErrors = ignoreNoSpaceErrors; 150 } 151 152 public String getNoSpaceMessage() { 153 return noSpaceMessage; 154 } 155 156 public void setNoSpaceMessage(String noSpaceMessage) { 157 this.noSpaceMessage = noSpaceMessage; 158 } 159 160 public boolean isIgnoreSQLExceptions() { 161 return ignoreSQLExceptions; 162 } 163 164 public void setIgnoreSQLExceptions(boolean ignoreSQLExceptions) { 165 this.ignoreSQLExceptions = ignoreSQLExceptions; 166 } 167 168 public String getSqlExceptionMessage() { 169 return sqlExceptionMessage; 170 } 171 172 public void setSqlExceptionMessage(String sqlExceptionMessage) { 173 this.sqlExceptionMessage = sqlExceptionMessage; 174 } 175 176 public boolean isStopStartConnectors() { 177 return stopStartConnectors; 178 } 179 180 public void setStopStartConnectors(boolean stopStartConnectors) { 181 this.stopStartConnectors = stopStartConnectors; 182 } 183 184 public long getResumeCheckSleepPeriod() { 185 return resumeCheckSleepPeriod; 186 } 187 188 public void setResumeCheckSleepPeriod(long resumeCheckSleepPeriod) { 189 this.resumeCheckSleepPeriod = resumeCheckSleepPeriod; 190 } 191 }