001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.kaha.impl.async; 018 019 import java.io.ByteArrayInputStream; 020 import java.io.ByteArrayOutputStream; 021 import java.io.DataInputStream; 022 import java.io.DataOutputStream; 023 import java.io.File; 024 import java.io.FilenameFilter; 025 import java.io.IOException; 026 import java.util.ArrayList; 027 import java.util.Collections; 028 import java.util.HashMap; 029 import java.util.HashSet; 030 import java.util.Iterator; 031 import java.util.LinkedHashMap; 032 import java.util.List; 033 import java.util.Map; 034 import java.util.Set; 035 import java.util.concurrent.ConcurrentHashMap; 036 import java.util.concurrent.atomic.AtomicLong; 037 import java.util.concurrent.atomic.AtomicReference; 038 import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand; 039 import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey; 040 import org.apache.activemq.thread.Scheduler; 041 import org.apache.activemq.util.ByteSequence; 042 import org.apache.activemq.util.IOHelper; 043 import org.slf4j.Logger; 044 import org.slf4j.LoggerFactory; 045 046 047 048 /** 049 * Manages DataFiles 050 * 051 * 052 */ 053 public class AsyncDataManager { 054 055 public static final int CONTROL_RECORD_MAX_LENGTH = 1024; 056 public static final int ITEM_HEAD_RESERVED_SPACE = 21; 057 // ITEM_HEAD_SPACE = length + type+ reserved space + SOR 058 public static final int ITEM_HEAD_SPACE = 4 + 1 + ITEM_HEAD_RESERVED_SPACE + 3; 059 public static final int ITEM_HEAD_OFFSET_TO_SOR = ITEM_HEAD_SPACE - 3; 060 public static final int ITEM_FOOT_SPACE = 3; // EOR 061 062 public static final int ITEM_HEAD_FOOT_SPACE = ITEM_HEAD_SPACE + ITEM_FOOT_SPACE; 063 064 public static final byte[] ITEM_HEAD_SOR = new byte[] {'S', 'O', 'R'}; // 065 public static final byte[] ITEM_HEAD_EOR = new byte[] {'E', 'O', 'R'}; // 066 067 public static final byte DATA_ITEM_TYPE = 1; 068 public static final byte REDO_ITEM_TYPE = 2; 069 public static final String DEFAULT_DIRECTORY = "data"; 070 public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive"; 071 public static final String DEFAULT_FILE_PREFIX = "data-"; 072 public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32; 073 public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30; 074 public static final int PREFERED_DIFF = 1024 * 512; 075 076 private static final Logger LOG = LoggerFactory.getLogger(AsyncDataManager.class); 077 protected Scheduler scheduler; 078 079 protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>(); 080 081 protected File directory = new File(DEFAULT_DIRECTORY); 082 protected File directoryArchive = new File (DEFAULT_ARCHIVE_DIRECTORY); 083 protected String filePrefix = DEFAULT_FILE_PREFIX; 084 protected ControlFile controlFile; 085 protected boolean started; 086 protected boolean useNio = true; 087 088 protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH; 089 protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF; 090 091 protected DataFileAppender appender; 092 protected DataFileAccessorPool accessorPool; 093 094 protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>(); 095 protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>(); 096 protected DataFile currentWriteFile; 097 098 protected Location mark; 099 protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>(); 100 protected Runnable cleanupTask; 101 protected final AtomicLong storeSize; 102 protected boolean archiveDataLogs; 103 104 public AsyncDataManager(AtomicLong storeSize) { 105 this.storeSize=storeSize; 106 } 107 108 public AsyncDataManager() { 109 this(new AtomicLong()); 110 } 111 112 @SuppressWarnings("unchecked") 113 public synchronized void start() throws IOException { 114 if (started) { 115 return; 116 } 117 118 started = true; 119 preferedFileLength=Math.max(PREFERED_DIFF, getMaxFileLength()-PREFERED_DIFF); 120 lock(); 121 122 accessorPool = new DataFileAccessorPool(this); 123 ByteSequence sequence = controlFile.load(); 124 if (sequence != null && sequence.getLength() > 0) { 125 unmarshallState(sequence); 126 } 127 if (useNio) { 128 appender = new NIODataFileAppender(this); 129 } else { 130 appender = new DataFileAppender(this); 131 } 132 133 File[] files = directory.listFiles(new FilenameFilter() { 134 public boolean accept(File dir, String n) { 135 return dir.equals(directory) && n.startsWith(filePrefix); 136 } 137 }); 138 139 if (files != null) { 140 for (int i = 0; i < files.length; i++) { 141 try { 142 File file = files[i]; 143 String n = file.getName(); 144 String numStr = n.substring(filePrefix.length(), n.length()); 145 int num = Integer.parseInt(numStr); 146 DataFile dataFile = new DataFile(file, num, preferedFileLength); 147 fileMap.put(dataFile.getDataFileId(), dataFile); 148 storeSize.addAndGet(dataFile.getLength()); 149 } catch (NumberFormatException e) { 150 // Ignore file that do not match the pattern. 151 } 152 } 153 154 // Sort the list so that we can link the DataFiles together in the 155 // right order. 156 List<DataFile> l = new ArrayList<DataFile>(fileMap.values()); 157 Collections.sort(l); 158 currentWriteFile = null; 159 for (DataFile df : l) { 160 if (currentWriteFile != null) { 161 currentWriteFile.linkAfter(df); 162 } 163 currentWriteFile = df; 164 fileByFileMap.put(df.getFile(), df); 165 } 166 } 167 168 // Need to check the current Write File to see if there was a partial 169 // write to it. 170 if (currentWriteFile != null) { 171 172 // See if the lastSyncedLocation is valid.. 173 Location l = lastAppendLocation.get(); 174 if (l != null && l.getDataFileId() != currentWriteFile.getDataFileId().intValue()) { 175 l = null; 176 } 177 178 // If we know the last location that was ok.. then we can skip lots 179 // of checking 180 try{ 181 l = recoveryCheck(currentWriteFile, l); 182 lastAppendLocation.set(l); 183 }catch(IOException e){ 184 LOG.warn("recovery check failed", e); 185 } 186 } 187 188 storeState(false); 189 190 cleanupTask = new Runnable() { 191 public void run() { 192 cleanup(); 193 } 194 }; 195 this.scheduler = new Scheduler("AsyncDataManager Scheduler"); 196 try { 197 this.scheduler.start(); 198 } catch (Exception e) { 199 IOException ioe = new IOException("scheduler start: " + e); 200 ioe.initCause(e); 201 throw ioe; 202 } 203 this.scheduler.executePeriodically(cleanupTask, DEFAULT_CLEANUP_INTERVAL); 204 } 205 206 public void lock() throws IOException { 207 synchronized (this) { 208 if (controlFile == null || controlFile.isDisposed()) { 209 IOHelper.mkdirs(directory); 210 controlFile = new ControlFile(new File(directory, filePrefix + "control"), CONTROL_RECORD_MAX_LENGTH); 211 } 212 controlFile.lock(); 213 } 214 } 215 216 protected Location recoveryCheck(DataFile dataFile, Location location) throws IOException { 217 if (location == null) { 218 location = new Location(); 219 location.setDataFileId(dataFile.getDataFileId()); 220 location.setOffset(0); 221 } 222 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 223 try { 224 reader.readLocationDetails(location); 225 while (reader.readLocationDetailsAndValidate(location)) { 226 location.setOffset(location.getOffset() + location.getSize()); 227 } 228 } finally { 229 accessorPool.closeDataFileAccessor(reader); 230 } 231 dataFile.setLength(location.getOffset()); 232 return location; 233 } 234 235 protected void unmarshallState(ByteSequence sequence) throws IOException { 236 ByteArrayInputStream bais = new ByteArrayInputStream(sequence.getData(), sequence.getOffset(), sequence.getLength()); 237 DataInputStream dis = new DataInputStream(bais); 238 if (dis.readBoolean()) { 239 mark = new Location(); 240 mark.readExternal(dis); 241 } else { 242 mark = null; 243 } 244 if (dis.readBoolean()) { 245 Location l = new Location(); 246 l.readExternal(dis); 247 lastAppendLocation.set(l); 248 } else { 249 lastAppendLocation.set(null); 250 } 251 } 252 253 private synchronized ByteSequence marshallState() throws IOException { 254 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 255 DataOutputStream dos = new DataOutputStream(baos); 256 257 if (mark != null) { 258 dos.writeBoolean(true); 259 mark.writeExternal(dos); 260 } else { 261 dos.writeBoolean(false); 262 } 263 Location l = lastAppendLocation.get(); 264 if (l != null) { 265 dos.writeBoolean(true); 266 l.writeExternal(dos); 267 } else { 268 dos.writeBoolean(false); 269 } 270 271 byte[] bs = baos.toByteArray(); 272 return new ByteSequence(bs, 0, bs.length); 273 } 274 275 synchronized DataFile allocateLocation(Location location) throws IOException { 276 if (currentWriteFile == null || ((currentWriteFile.getLength() + location.getSize()) > maxFileLength)) { 277 int nextNum = currentWriteFile != null ? currentWriteFile.getDataFileId().intValue() + 1 : 1; 278 279 String fileName = filePrefix + nextNum; 280 File file = new File(directory, fileName); 281 DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength); 282 //actually allocate the disk space 283 nextWriteFile.closeRandomAccessFile(nextWriteFile.openRandomAccessFile(true)); 284 fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile); 285 fileByFileMap.put(file, nextWriteFile); 286 if (currentWriteFile != null) { 287 currentWriteFile.linkAfter(nextWriteFile); 288 if (currentWriteFile.isUnused()) { 289 removeDataFile(currentWriteFile); 290 } 291 } 292 currentWriteFile = nextWriteFile; 293 294 } 295 location.setOffset(currentWriteFile.getLength()); 296 location.setDataFileId(currentWriteFile.getDataFileId().intValue()); 297 int size = location.getSize(); 298 currentWriteFile.incrementLength(size); 299 currentWriteFile.increment(); 300 storeSize.addAndGet(size); 301 return currentWriteFile; 302 } 303 304 public synchronized void removeLocation(Location location) throws IOException{ 305 306 DataFile dataFile = getDataFile(location); 307 dataFile.decrement(); 308 } 309 310 synchronized DataFile getDataFile(Location item) throws IOException { 311 Integer key = Integer.valueOf(item.getDataFileId()); 312 DataFile dataFile = fileMap.get(key); 313 if (dataFile == null) { 314 LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap); 315 throw new IOException("Could not locate data file " + filePrefix + item.getDataFileId()); 316 } 317 return dataFile; 318 } 319 320 synchronized File getFile(Location item) throws IOException { 321 Integer key = Integer.valueOf(item.getDataFileId()); 322 DataFile dataFile = fileMap.get(key); 323 if (dataFile == null) { 324 LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap); 325 throw new IOException("Could not locate data file " + filePrefix + item.getDataFileId()); 326 } 327 return dataFile.getFile(); 328 } 329 330 private DataFile getNextDataFile(DataFile dataFile) { 331 return (DataFile)dataFile.getNext(); 332 } 333 334 public synchronized void close() throws IOException { 335 if (!started) { 336 return; 337 } 338 this.scheduler.cancel(cleanupTask); 339 try { 340 this.scheduler.stop(); 341 } catch (Exception e) { 342 IOException ioe = new IOException("scheduler stop: " + e); 343 ioe.initCause(e); 344 throw ioe; 345 } 346 accessorPool.close(); 347 storeState(false); 348 appender.close(); 349 fileMap.clear(); 350 fileByFileMap.clear(); 351 controlFile.unlock(); 352 controlFile.dispose(); 353 started = false; 354 } 355 356 synchronized void cleanup() { 357 if (accessorPool != null) { 358 accessorPool.disposeUnused(); 359 } 360 } 361 362 public synchronized boolean delete() throws IOException { 363 364 // Close all open file handles... 365 appender.close(); 366 accessorPool.close(); 367 368 boolean result = true; 369 for (Iterator i = fileMap.values().iterator(); i.hasNext();) { 370 DataFile dataFile = (DataFile)i.next(); 371 storeSize.addAndGet(-dataFile.getLength()); 372 result &= dataFile.delete(); 373 } 374 fileMap.clear(); 375 fileByFileMap.clear(); 376 lastAppendLocation.set(null); 377 mark = null; 378 currentWriteFile = null; 379 380 // reopen open file handles... 381 accessorPool = new DataFileAccessorPool(this); 382 if (useNio) { 383 appender = new NIODataFileAppender(this); 384 } else { 385 appender = new DataFileAppender(this); 386 } 387 return result; 388 } 389 390 public synchronized void addInterestInFile(int file) throws IOException { 391 if (file >= 0) { 392 Integer key = Integer.valueOf(file); 393 DataFile dataFile = fileMap.get(key); 394 if (dataFile == null) { 395 throw new IOException("That data file does not exist"); 396 } 397 addInterestInFile(dataFile); 398 } 399 } 400 401 synchronized void addInterestInFile(DataFile dataFile) { 402 if (dataFile != null) { 403 dataFile.increment(); 404 } 405 } 406 407 public synchronized void removeInterestInFile(int file) throws IOException { 408 if (file >= 0) { 409 Integer key = Integer.valueOf(file); 410 DataFile dataFile = fileMap.get(key); 411 removeInterestInFile(dataFile); 412 } 413 414 } 415 416 synchronized void removeInterestInFile(DataFile dataFile) throws IOException { 417 if (dataFile != null) { 418 if (dataFile.decrement() <= 0) { 419 removeDataFile(dataFile); 420 } 421 } 422 } 423 424 public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Set<Integer>inProgress) throws IOException { 425 Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet()); 426 unUsed.removeAll(inUse); 427 unUsed.removeAll(inProgress); 428 429 List<DataFile> purgeList = new ArrayList<DataFile>(); 430 for (Integer key : unUsed) { 431 DataFile dataFile = fileMap.get(key); 432 purgeList.add(dataFile); 433 } 434 for (DataFile dataFile : purgeList) { 435 if (dataFile.getDataFileId() != currentWriteFile.getDataFileId()) { 436 forceRemoveDataFile(dataFile); 437 } 438 } 439 } 440 441 public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Integer lastFile) throws IOException { 442 Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet()); 443 unUsed.removeAll(inUse); 444 445 List<DataFile> purgeList = new ArrayList<DataFile>(); 446 for (Integer key : unUsed) { 447 // Only add files less than the lastFile.. 448 if( key.intValue() < lastFile.intValue() ) { 449 DataFile dataFile = fileMap.get(key); 450 purgeList.add(dataFile); 451 } 452 } 453 if (LOG.isDebugEnabled()) { 454 LOG.debug("lastFileId=" + lastFile + ", purgeList: (" + purgeList.size() + ") " + purgeList); 455 } 456 for (DataFile dataFile : purgeList) { 457 forceRemoveDataFile(dataFile); 458 } 459 } 460 461 public synchronized void consolidateDataFiles() throws IOException { 462 List<DataFile> purgeList = new ArrayList<DataFile>(); 463 for (DataFile dataFile : fileMap.values()) { 464 if (dataFile.isUnused()) { 465 purgeList.add(dataFile); 466 } 467 } 468 for (DataFile dataFile : purgeList) { 469 removeDataFile(dataFile); 470 } 471 } 472 473 private synchronized void removeDataFile(DataFile dataFile) throws IOException { 474 475 // Make sure we don't delete too much data. 476 if (dataFile == currentWriteFile || mark == null || dataFile.getDataFileId() >= mark.getDataFileId()) { 477 LOG.debug("Won't remove DataFile" + dataFile); 478 return; 479 } 480 forceRemoveDataFile(dataFile); 481 } 482 483 private synchronized void forceRemoveDataFile(DataFile dataFile) 484 throws IOException { 485 accessorPool.disposeDataFileAccessors(dataFile); 486 fileByFileMap.remove(dataFile.getFile()); 487 fileMap.remove(dataFile.getDataFileId()); 488 storeSize.addAndGet(-dataFile.getLength()); 489 dataFile.unlink(); 490 if (archiveDataLogs) { 491 dataFile.move(getDirectoryArchive()); 492 LOG.debug("moved data file " + dataFile + " to " 493 + getDirectoryArchive()); 494 } else { 495 boolean result = dataFile.delete(); 496 if (!result) { 497 LOG.info("Failed to discard data file " + dataFile); 498 } 499 } 500 } 501 502 /** 503 * @return the maxFileLength 504 */ 505 public int getMaxFileLength() { 506 return maxFileLength; 507 } 508 509 /** 510 * @param maxFileLength the maxFileLength to set 511 */ 512 public void setMaxFileLength(int maxFileLength) { 513 this.maxFileLength = maxFileLength; 514 } 515 516 @Override 517 public String toString() { 518 return "DataManager:(" + filePrefix + ")"; 519 } 520 521 public synchronized Location getMark() throws IllegalStateException { 522 return mark; 523 } 524 525 public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException { 526 527 Location cur = null; 528 while (true) { 529 if (cur == null) { 530 if (location == null) { 531 DataFile head = (DataFile)currentWriteFile.getHeadNode(); 532 cur = new Location(); 533 cur.setDataFileId(head.getDataFileId()); 534 cur.setOffset(0); 535 } else { 536 // Set to the next offset.. 537 if( location.getSize() == -1 ) { 538 cur = new Location(location); 539 } else { 540 cur = new Location(location); 541 cur.setOffset(location.getOffset()+location.getSize()); 542 } 543 } 544 } else { 545 cur.setOffset(cur.getOffset() + cur.getSize()); 546 } 547 548 DataFile dataFile = getDataFile(cur); 549 550 // Did it go into the next file?? 551 if (dataFile.getLength() <= cur.getOffset()) { 552 dataFile = getNextDataFile(dataFile); 553 if (dataFile == null) { 554 return null; 555 } else { 556 cur.setDataFileId(dataFile.getDataFileId().intValue()); 557 cur.setOffset(0); 558 } 559 } 560 561 // Load in location size and type. 562 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 563 try { 564 reader.readLocationDetails(cur); 565 } finally { 566 accessorPool.closeDataFileAccessor(reader); 567 } 568 569 if (cur.getType() == 0) { 570 return null; 571 } else if (cur.getType() > 0) { 572 // Only return user records. 573 return cur; 574 } 575 } 576 } 577 578 public synchronized Location getNextLocation(File file, Location lastLocation,boolean thisFileOnly) throws IllegalStateException, IOException{ 579 DataFile df = fileByFileMap.get(file); 580 return getNextLocation(df, lastLocation,thisFileOnly); 581 } 582 583 public synchronized Location getNextLocation(DataFile dataFile, 584 Location lastLocation,boolean thisFileOnly) throws IOException, IllegalStateException { 585 586 Location cur = null; 587 while (true) { 588 if (cur == null) { 589 if (lastLocation == null) { 590 DataFile head = (DataFile)dataFile.getHeadNode(); 591 cur = new Location(); 592 cur.setDataFileId(head.getDataFileId()); 593 cur.setOffset(0); 594 } else { 595 // Set to the next offset.. 596 cur = new Location(lastLocation); 597 cur.setOffset(cur.getOffset() + cur.getSize()); 598 } 599 } else { 600 cur.setOffset(cur.getOffset() + cur.getSize()); 601 } 602 603 604 // Did it go into the next file?? 605 if (dataFile.getLength() <= cur.getOffset()) { 606 if (thisFileOnly) { 607 return null; 608 }else { 609 dataFile = getNextDataFile(dataFile); 610 if (dataFile == null) { 611 return null; 612 } else { 613 cur.setDataFileId(dataFile.getDataFileId().intValue()); 614 cur.setOffset(0); 615 } 616 } 617 } 618 619 // Load in location size and type. 620 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 621 try { 622 reader.readLocationDetails(cur); 623 } finally { 624 accessorPool.closeDataFileAccessor(reader); 625 } 626 627 if (cur.getType() == 0) { 628 return null; 629 } else if (cur.getType() > 0) { 630 // Only return user records. 631 return cur; 632 } 633 } 634 } 635 636 public synchronized ByteSequence read(Location location) throws IOException, IllegalStateException { 637 DataFile dataFile = getDataFile(location); 638 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 639 ByteSequence rc = null; 640 try { 641 rc = reader.readRecord(location); 642 } finally { 643 accessorPool.closeDataFileAccessor(reader); 644 } 645 return rc; 646 } 647 648 public void setMark(Location location, boolean sync) throws IOException, IllegalStateException { 649 synchronized (this) { 650 mark = location; 651 } 652 storeState(sync); 653 } 654 655 protected synchronized void storeState(boolean sync) throws IOException { 656 ByteSequence state = marshallState(); 657 appender.storeItem(state, Location.MARK_TYPE, sync); 658 controlFile.store(state, sync); 659 } 660 661 public synchronized Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException { 662 Location loc = appender.storeItem(data, Location.USER_TYPE, sync); 663 return loc; 664 } 665 666 public synchronized Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException { 667 Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete); 668 return loc; 669 } 670 671 public synchronized Location write(ByteSequence data, byte type, boolean sync) throws IOException, IllegalStateException { 672 return appender.storeItem(data, type, sync); 673 } 674 675 public void update(Location location, ByteSequence data, boolean sync) throws IOException { 676 DataFile dataFile = getDataFile(location); 677 DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile); 678 try { 679 updater.updateRecord(location, data, sync); 680 } finally { 681 accessorPool.closeDataFileAccessor(updater); 682 } 683 } 684 685 public File getDirectory() { 686 return directory; 687 } 688 689 public void setDirectory(File directory) { 690 this.directory = directory; 691 } 692 693 public String getFilePrefix() { 694 return filePrefix; 695 } 696 697 public void setFilePrefix(String filePrefix) { 698 this.filePrefix = IOHelper.toFileSystemSafeName(filePrefix); 699 } 700 701 public Map<WriteKey, WriteCommand> getInflightWrites() { 702 return inflightWrites; 703 } 704 705 public Location getLastAppendLocation() { 706 return lastAppendLocation.get(); 707 } 708 709 public void setLastAppendLocation(Location lastSyncedLocation) { 710 this.lastAppendLocation.set(lastSyncedLocation); 711 } 712 713 public boolean isUseNio() { 714 return useNio; 715 } 716 717 public void setUseNio(boolean useNio) { 718 this.useNio = useNio; 719 } 720 721 public File getDirectoryArchive() { 722 return directoryArchive; 723 } 724 725 public void setDirectoryArchive(File directoryArchive) { 726 this.directoryArchive = directoryArchive; 727 } 728 729 public boolean isArchiveDataLogs() { 730 return archiveDataLogs; 731 } 732 733 public void setArchiveDataLogs(boolean archiveDataLogs) { 734 this.archiveDataLogs = archiveDataLogs; 735 } 736 737 synchronized public Integer getCurrentDataFileId() { 738 if( currentWriteFile==null ) 739 return null; 740 return currentWriteFile.getDataFileId(); 741 } 742 743 /** 744 * Get a set of files - only valid after start() 745 * @return files currently being used 746 */ 747 public Set<File> getFiles(){ 748 return fileByFileMap.keySet(); 749 } 750 751 synchronized public long getDiskSize() { 752 long rc=0; 753 DataFile cur = (DataFile)currentWriteFile.getHeadNode(); 754 while( cur !=null ) { 755 rc += cur.getLength(); 756 cur = (DataFile) cur.getNext(); 757 } 758 return rc; 759 } 760 761 synchronized public long getDiskSizeUntil(Location startPosition) { 762 long rc=0; 763 DataFile cur = (DataFile)currentWriteFile.getHeadNode(); 764 while( cur !=null ) { 765 if( cur.getDataFileId().intValue() >= startPosition.getDataFileId() ) { 766 return rc + startPosition.getOffset(); 767 } 768 rc += cur.getLength(); 769 cur = (DataFile) cur.getNext(); 770 } 771 return rc; 772 } 773 774 }