001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.kahadb.journal; 018 019 import java.io.File; 020 import java.io.FilenameFilter; 021 import java.io.IOException; 022 import java.io.UnsupportedEncodingException; 023 import java.util.ArrayList; 024 import java.util.Collections; 025 import java.util.HashMap; 026 import java.util.Iterator; 027 import java.util.LinkedHashMap; 028 import java.util.List; 029 import java.util.Map; 030 import java.util.Set; 031 import java.util.Timer; 032 import java.util.TimerTask; 033 import java.util.TreeMap; 034 import java.util.concurrent.ConcurrentHashMap; 035 import java.util.concurrent.atomic.AtomicLong; 036 import java.util.concurrent.atomic.AtomicReference; 037 import java.util.zip.Adler32; 038 import java.util.zip.Checksum; 039 import org.slf4j.Logger; 040 import org.slf4j.LoggerFactory; 041 import org.apache.kahadb.journal.DataFileAppender.WriteCommand; 042 import org.apache.kahadb.journal.DataFileAppender.WriteKey; 043 import org.apache.kahadb.util.ByteSequence; 044 import org.apache.kahadb.util.DataByteArrayInputStream; 045 import org.apache.kahadb.util.DataByteArrayOutputStream; 046 import org.apache.kahadb.util.LinkedNodeList; 047 import org.apache.kahadb.util.SchedulerTimerTask; 048 import org.apache.kahadb.util.Sequence; 049 050 /** 051 * Manages DataFiles 052 * 053 * 054 */ 055 public class Journal { 056 057 private static final int MAX_BATCH_SIZE = 32*1024*1024; 058 059 // ITEM_HEAD_SPACE = length + type+ reserved space + SOR 060 public static final int RECORD_HEAD_SPACE = 4 + 1; 061 062 public static final byte USER_RECORD_TYPE = 1; 063 public static final byte BATCH_CONTROL_RECORD_TYPE = 2; 064 // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch. 065 public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH"); 066 public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE+BATCH_CONTROL_RECORD_MAGIC.length+4+8; 067 public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader(); 068 069 private static byte[] createBatchControlRecordHeader() { 070 try { 071 DataByteArrayOutputStream os = new DataByteArrayOutputStream(); 072 os.writeInt(BATCH_CONTROL_RECORD_SIZE); 073 os.writeByte(BATCH_CONTROL_RECORD_TYPE); 074 os.write(BATCH_CONTROL_RECORD_MAGIC); 075 ByteSequence sequence = os.toByteSequence(); 076 sequence.compact(); 077 return sequence.getData(); 078 } catch (IOException e) { 079 throw new RuntimeException("Could not create batch control record header."); 080 } 081 } 082 083 public static final String DEFAULT_DIRECTORY = "."; 084 public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive"; 085 public static final String DEFAULT_FILE_PREFIX = "db-"; 086 public static final String DEFAULT_FILE_SUFFIX = ".log"; 087 public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32; 088 public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30; 089 public static final int PREFERED_DIFF = 1024 * 512; 090 public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4; 091 092 private static final Logger LOG = LoggerFactory.getLogger(Journal.class); 093 094 protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>(); 095 096 protected File directory = new File(DEFAULT_DIRECTORY); 097 protected File directoryArchive = new File(DEFAULT_ARCHIVE_DIRECTORY); 098 protected String filePrefix = DEFAULT_FILE_PREFIX; 099 protected String fileSuffix = DEFAULT_FILE_SUFFIX; 100 protected boolean started; 101 102 protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH; 103 protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF; 104 protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE; 105 106 protected DataFileAppender appender; 107 protected DataFileAccessorPool accessorPool; 108 109 protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>(); 110 protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>(); 111 protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>(); 112 113 protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>(); 114 protected Runnable cleanupTask; 115 protected AtomicLong totalLength = new AtomicLong(); 116 protected boolean archiveDataLogs; 117 private ReplicationTarget replicationTarget; 118 protected boolean checksum; 119 protected boolean checkForCorruptionOnStartup; 120 private Timer timer; 121 122 123 public synchronized void start() throws IOException { 124 if (started) { 125 return; 126 } 127 128 long start = System.currentTimeMillis(); 129 accessorPool = new DataFileAccessorPool(this); 130 started = true; 131 preferedFileLength = Math.max(PREFERED_DIFF, getMaxFileLength() - PREFERED_DIFF); 132 133 appender = new DataFileAppender(this); 134 135 File[] files = directory.listFiles(new FilenameFilter() { 136 public boolean accept(File dir, String n) { 137 return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix); 138 } 139 }); 140 141 if (files != null) { 142 for (int i = 0; i < files.length; i++) { 143 try { 144 File file = files[i]; 145 String n = file.getName(); 146 String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length()); 147 int num = Integer.parseInt(numStr); 148 DataFile dataFile = new DataFile(file, num, preferedFileLength); 149 fileMap.put(dataFile.getDataFileId(), dataFile); 150 totalLength.addAndGet(dataFile.getLength()); 151 } catch (NumberFormatException e) { 152 // Ignore file that do not match the pattern. 153 } 154 } 155 156 // Sort the list so that we can link the DataFiles together in the 157 // right order. 158 List<DataFile> l = new ArrayList<DataFile>(fileMap.values()); 159 Collections.sort(l); 160 for (DataFile df : l) { 161 if (df.getLength() == 0) { 162 // possibly the result of a previous failed write 163 LOG.info("ignoring zero length, partially initialised journal data file: " + df); 164 continue; 165 } 166 dataFiles.addLast(df); 167 fileByFileMap.put(df.getFile(), df); 168 169 if( isCheckForCorruptionOnStartup() ) { 170 lastAppendLocation.set(recoveryCheck(df)); 171 } 172 } 173 } 174 175 getCurrentWriteFile(); 176 177 if( lastAppendLocation.get()==null ) { 178 DataFile df = dataFiles.getTail(); 179 lastAppendLocation.set(recoveryCheck(df)); 180 } 181 182 cleanupTask = new Runnable() { 183 public void run() { 184 cleanup(); 185 } 186 }; 187 this.timer = new Timer("KahaDB Scheduler", true); 188 TimerTask task = new SchedulerTimerTask(cleanupTask); 189 this.timer.scheduleAtFixedRate(task, DEFAULT_CLEANUP_INTERVAL,DEFAULT_CLEANUP_INTERVAL); 190 long end = System.currentTimeMillis(); 191 LOG.trace("Startup took: "+(end-start)+" ms"); 192 } 193 194 private static byte[] bytes(String string) { 195 try { 196 return string.getBytes("UTF-8"); 197 } catch (UnsupportedEncodingException e) { 198 throw new RuntimeException(e); 199 } 200 } 201 202 protected Location recoveryCheck(DataFile dataFile) throws IOException { 203 Location location = new Location(); 204 location.setDataFileId(dataFile.getDataFileId()); 205 location.setOffset(0); 206 207 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 208 try { 209 while( true ) { 210 int size = checkBatchRecord(reader, location.getOffset()); 211 if ( size>=0 ) { 212 location.setOffset(location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size); 213 } else { 214 215 // Perhaps it's just some corruption... scan through the file to find the next valid batch record. We 216 // may have subsequent valid batch records. 217 int nextOffset = findNextBatchRecord(reader, location.getOffset()+1); 218 if( nextOffset >=0 ) { 219 Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1); 220 LOG.info("Corrupt journal records found in '"+dataFile.getFile()+"' between offsets: "+sequence); 221 dataFile.corruptedBlocks.add(sequence); 222 location.setOffset(nextOffset); 223 } else { 224 break; 225 } 226 } 227 } 228 229 } catch (IOException e) { 230 } finally { 231 accessorPool.closeDataFileAccessor(reader); 232 } 233 234 int existingLen = dataFile.getLength(); 235 dataFile.setLength(location.getOffset()); 236 if (existingLen > dataFile.getLength()) { 237 totalLength.addAndGet(dataFile.getLength() - existingLen); 238 } 239 240 if( !dataFile.corruptedBlocks.isEmpty() ) { 241 // Is the end of the data file corrupted? 242 if( dataFile.corruptedBlocks.getTail().getLast()+1 == location.getOffset() ) { 243 dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst()); 244 } 245 } 246 247 return location; 248 } 249 250 private int findNextBatchRecord(DataFileAccessor reader, int offset) throws IOException { 251 ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER); 252 byte data[] = new byte[1024*4]; 253 ByteSequence bs = new ByteSequence(data, 0, reader.read(offset, data)); 254 255 int pos = 0; 256 while( true ) { 257 pos = bs.indexOf(header, pos); 258 if( pos >= 0 ) { 259 return offset+pos; 260 } else { 261 // need to load the next data chunck in.. 262 if( bs.length != data.length ) { 263 // If we had a short read then we were at EOF 264 return -1; 265 } 266 offset += bs.length-BATCH_CONTROL_RECORD_HEADER.length; 267 bs = new ByteSequence(data, 0, reader.read(offset, data)); 268 pos=0; 269 } 270 } 271 } 272 273 274 public int checkBatchRecord(DataFileAccessor reader, int offset) throws IOException { 275 byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE]; 276 DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord); 277 278 reader.readFully(offset, controlRecord); 279 280 // Assert that it's a batch record. 281 for( int i=0; i < BATCH_CONTROL_RECORD_HEADER.length; i++ ) { 282 if( controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i] ) { 283 return -1; 284 } 285 } 286 287 int size = controlIs.readInt(); 288 if( size > MAX_BATCH_SIZE ) { 289 return -1; 290 } 291 292 if( isChecksum() ) { 293 294 long expectedChecksum = controlIs.readLong(); 295 if( expectedChecksum == 0 ) { 296 // Checksuming was not enabled when the record was stored. 297 // we can't validate the record :( 298 return size; 299 } 300 301 byte data[] = new byte[size]; 302 reader.readFully(offset+BATCH_CONTROL_RECORD_SIZE, data); 303 304 Checksum checksum = new Adler32(); 305 checksum.update(data, 0, data.length); 306 307 if( expectedChecksum!=checksum.getValue() ) { 308 return -1; 309 } 310 311 } 312 return size; 313 } 314 315 316 void addToTotalLength(int size) { 317 totalLength.addAndGet(size); 318 } 319 320 321 synchronized DataFile getCurrentWriteFile() throws IOException { 322 if (dataFiles.isEmpty()) { 323 rotateWriteFile(); 324 } 325 return dataFiles.getTail(); 326 } 327 328 synchronized DataFile rotateWriteFile() { 329 int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1; 330 File file = getFile(nextNum); 331 DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength); 332 // actually allocate the disk space 333 fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile); 334 fileByFileMap.put(file, nextWriteFile); 335 dataFiles.addLast(nextWriteFile); 336 return nextWriteFile; 337 } 338 339 public File getFile(int nextNum) { 340 String fileName = filePrefix + nextNum + fileSuffix; 341 File file = new File(directory, fileName); 342 return file; 343 } 344 345 synchronized DataFile getDataFile(Location item) throws IOException { 346 Integer key = Integer.valueOf(item.getDataFileId()); 347 DataFile dataFile = fileMap.get(key); 348 if (dataFile == null) { 349 LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap); 350 throw new IOException("Could not locate data file " + getFile(item.getDataFileId())); 351 } 352 return dataFile; 353 } 354 355 synchronized File getFile(Location item) throws IOException { 356 Integer key = Integer.valueOf(item.getDataFileId()); 357 DataFile dataFile = fileMap.get(key); 358 if (dataFile == null) { 359 LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap); 360 throw new IOException("Could not locate data file " + getFile(item.getDataFileId())); 361 } 362 return dataFile.getFile(); 363 } 364 365 private DataFile getNextDataFile(DataFile dataFile) { 366 return dataFile.getNext(); 367 } 368 369 public synchronized void close() throws IOException { 370 if (!started) { 371 return; 372 } 373 if (this.timer != null) { 374 this.timer.cancel(); 375 } 376 accessorPool.close(); 377 appender.close(); 378 fileMap.clear(); 379 fileByFileMap.clear(); 380 dataFiles.clear(); 381 lastAppendLocation.set(null); 382 started = false; 383 } 384 385 synchronized void cleanup() { 386 if (accessorPool != null) { 387 accessorPool.disposeUnused(); 388 } 389 } 390 391 public synchronized boolean delete() throws IOException { 392 393 // Close all open file handles... 394 appender.close(); 395 accessorPool.close(); 396 397 boolean result = true; 398 for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) { 399 DataFile dataFile = i.next(); 400 totalLength.addAndGet(-dataFile.getLength()); 401 result &= dataFile.delete(); 402 } 403 fileMap.clear(); 404 fileByFileMap.clear(); 405 lastAppendLocation.set(null); 406 dataFiles = new LinkedNodeList<DataFile>(); 407 408 // reopen open file handles... 409 accessorPool = new DataFileAccessorPool(this); 410 appender = new DataFileAppender(this); 411 return result; 412 } 413 414 public synchronized void removeDataFiles(Set<Integer> files) throws IOException { 415 for (Integer key : files) { 416 // Can't remove the data file (or subsequent files) that is currently being written to. 417 if( key >= lastAppendLocation.get().getDataFileId() ) { 418 continue; 419 } 420 DataFile dataFile = fileMap.get(key); 421 if( dataFile!=null ) { 422 forceRemoveDataFile(dataFile); 423 } 424 } 425 } 426 427 private synchronized void forceRemoveDataFile(DataFile dataFile) throws IOException { 428 accessorPool.disposeDataFileAccessors(dataFile); 429 fileByFileMap.remove(dataFile.getFile()); 430 fileMap.remove(dataFile.getDataFileId()); 431 totalLength.addAndGet(-dataFile.getLength()); 432 dataFile.unlink(); 433 if (archiveDataLogs) { 434 dataFile.move(getDirectoryArchive()); 435 LOG.debug("moved data file " + dataFile + " to " + getDirectoryArchive()); 436 } else { 437 if ( dataFile.delete() ) { 438 LOG.debug("Discarded data file " + dataFile); 439 } else { 440 LOG.warn("Failed to discard data file " + dataFile.getFile()); 441 } 442 } 443 } 444 445 /** 446 * @return the maxFileLength 447 */ 448 public int getMaxFileLength() { 449 return maxFileLength; 450 } 451 452 /** 453 * @param maxFileLength the maxFileLength to set 454 */ 455 public void setMaxFileLength(int maxFileLength) { 456 this.maxFileLength = maxFileLength; 457 } 458 459 @Override 460 public String toString() { 461 return directory.toString(); 462 } 463 464 public synchronized void appendedExternally(Location loc, int length) throws IOException { 465 DataFile dataFile = null; 466 if( dataFiles.getTail().getDataFileId() == loc.getDataFileId() ) { 467 // It's an update to the current log file.. 468 dataFile = dataFiles.getTail(); 469 dataFile.incrementLength(length); 470 } else if( dataFiles.getTail().getDataFileId()+1 == loc.getDataFileId() ) { 471 // It's an update to the next log file. 472 int nextNum = loc.getDataFileId(); 473 File file = getFile(nextNum); 474 dataFile = new DataFile(file, nextNum, preferedFileLength); 475 // actually allocate the disk space 476 fileMap.put(dataFile.getDataFileId(), dataFile); 477 fileByFileMap.put(file, dataFile); 478 dataFiles.addLast(dataFile); 479 } else { 480 throw new IOException("Invalid external append."); 481 } 482 } 483 484 public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException { 485 486 Location cur = null; 487 while (true) { 488 if (cur == null) { 489 if (location == null) { 490 DataFile head = dataFiles.getHead(); 491 if( head == null ) { 492 return null; 493 } 494 cur = new Location(); 495 cur.setDataFileId(head.getDataFileId()); 496 cur.setOffset(0); 497 } else { 498 // Set to the next offset.. 499 if (location.getSize() == -1) { 500 cur = new Location(location); 501 } else { 502 cur = new Location(location); 503 cur.setOffset(location.getOffset() + location.getSize()); 504 } 505 } 506 } else { 507 cur.setOffset(cur.getOffset() + cur.getSize()); 508 } 509 510 DataFile dataFile = getDataFile(cur); 511 512 // Did it go into the next file?? 513 if (dataFile.getLength() <= cur.getOffset()) { 514 dataFile = getNextDataFile(dataFile); 515 if (dataFile == null) { 516 return null; 517 } else { 518 cur.setDataFileId(dataFile.getDataFileId().intValue()); 519 cur.setOffset(0); 520 } 521 } 522 523 // Load in location size and type. 524 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 525 try { 526 reader.readLocationDetails(cur); 527 } finally { 528 accessorPool.closeDataFileAccessor(reader); 529 } 530 531 if (cur.getType() == 0) { 532 return null; 533 } else if (cur.getType() == USER_RECORD_TYPE) { 534 // Only return user records. 535 return cur; 536 } 537 } 538 } 539 540 public synchronized Location getNextLocation(File file, Location lastLocation, boolean thisFileOnly) throws IllegalStateException, IOException { 541 DataFile df = fileByFileMap.get(file); 542 return getNextLocation(df, lastLocation, thisFileOnly); 543 } 544 545 public synchronized Location getNextLocation(DataFile dataFile, Location lastLocation, boolean thisFileOnly) throws IOException, IllegalStateException { 546 547 Location cur = null; 548 while (true) { 549 if (cur == null) { 550 if (lastLocation == null) { 551 DataFile head = dataFile.getHeadNode(); 552 cur = new Location(); 553 cur.setDataFileId(head.getDataFileId()); 554 cur.setOffset(0); 555 } else { 556 // Set to the next offset.. 557 cur = new Location(lastLocation); 558 cur.setOffset(cur.getOffset() + cur.getSize()); 559 } 560 } else { 561 cur.setOffset(cur.getOffset() + cur.getSize()); 562 } 563 564 // Did it go into the next file?? 565 if (dataFile.getLength() <= cur.getOffset()) { 566 if (thisFileOnly) { 567 return null; 568 } else { 569 dataFile = getNextDataFile(dataFile); 570 if (dataFile == null) { 571 return null; 572 } else { 573 cur.setDataFileId(dataFile.getDataFileId().intValue()); 574 cur.setOffset(0); 575 } 576 } 577 } 578 579 // Load in location size and type. 580 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 581 try { 582 reader.readLocationDetails(cur); 583 } finally { 584 accessorPool.closeDataFileAccessor(reader); 585 } 586 587 if (cur.getType() == 0) { 588 return null; 589 } else if (cur.getType() > 0) { 590 // Only return user records. 591 return cur; 592 } 593 } 594 } 595 596 public synchronized ByteSequence read(Location location) throws IOException, IllegalStateException { 597 DataFile dataFile = getDataFile(location); 598 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 599 ByteSequence rc = null; 600 try { 601 rc = reader.readRecord(location); 602 } finally { 603 accessorPool.closeDataFileAccessor(reader); 604 } 605 return rc; 606 } 607 608 public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException { 609 Location loc = appender.storeItem(data, Location.USER_TYPE, sync); 610 return loc; 611 } 612 613 public Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException { 614 Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete); 615 return loc; 616 } 617 618 public void update(Location location, ByteSequence data, boolean sync) throws IOException { 619 DataFile dataFile = getDataFile(location); 620 DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile); 621 try { 622 updater.updateRecord(location, data, sync); 623 } finally { 624 accessorPool.closeDataFileAccessor(updater); 625 } 626 } 627 628 public File getDirectory() { 629 return directory; 630 } 631 632 public void setDirectory(File directory) { 633 this.directory = directory; 634 } 635 636 public String getFilePrefix() { 637 return filePrefix; 638 } 639 640 public void setFilePrefix(String filePrefix) { 641 this.filePrefix = filePrefix; 642 } 643 644 public Map<WriteKey, WriteCommand> getInflightWrites() { 645 return inflightWrites; 646 } 647 648 public Location getLastAppendLocation() { 649 return lastAppendLocation.get(); 650 } 651 652 public void setLastAppendLocation(Location lastSyncedLocation) { 653 this.lastAppendLocation.set(lastSyncedLocation); 654 } 655 656 public File getDirectoryArchive() { 657 return directoryArchive; 658 } 659 660 public void setDirectoryArchive(File directoryArchive) { 661 this.directoryArchive = directoryArchive; 662 } 663 664 public boolean isArchiveDataLogs() { 665 return archiveDataLogs; 666 } 667 668 public void setArchiveDataLogs(boolean archiveDataLogs) { 669 this.archiveDataLogs = archiveDataLogs; 670 } 671 672 synchronized public Integer getCurrentDataFileId() { 673 if (dataFiles.isEmpty()) 674 return null; 675 return dataFiles.getTail().getDataFileId(); 676 } 677 678 /** 679 * Get a set of files - only valid after start() 680 * 681 * @return files currently being used 682 */ 683 public Set<File> getFiles() { 684 return fileByFileMap.keySet(); 685 } 686 687 public synchronized Map<Integer, DataFile> getFileMap() { 688 return new TreeMap<Integer, DataFile>(fileMap); 689 } 690 691 public long getDiskSize() { 692 long tailLength=0; 693 synchronized( this ) { 694 if( !dataFiles.isEmpty() ) { 695 tailLength = dataFiles.getTail().getLength(); 696 } 697 } 698 699 long rc = totalLength.get(); 700 701 // The last file is actually at a minimum preferedFileLength big. 702 if( tailLength < preferedFileLength ) { 703 rc -= tailLength; 704 rc += preferedFileLength; 705 } 706 return rc; 707 } 708 709 public void setReplicationTarget(ReplicationTarget replicationTarget) { 710 this.replicationTarget = replicationTarget; 711 } 712 public ReplicationTarget getReplicationTarget() { 713 return replicationTarget; 714 } 715 716 public String getFileSuffix() { 717 return fileSuffix; 718 } 719 720 public void setFileSuffix(String fileSuffix) { 721 this.fileSuffix = fileSuffix; 722 } 723 724 public boolean isChecksum() { 725 return checksum; 726 } 727 728 public void setChecksum(boolean checksumWrites) { 729 this.checksum = checksumWrites; 730 } 731 732 public boolean isCheckForCorruptionOnStartup() { 733 return checkForCorruptionOnStartup; 734 } 735 736 public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) { 737 this.checkForCorruptionOnStartup = checkForCorruptionOnStartup; 738 } 739 740 public void setWriteBatchSize(int writeBatchSize) { 741 this.writeBatchSize = writeBatchSize; 742 } 743 744 public int getWriteBatchSize() { 745 return writeBatchSize; 746 } 747 748 public void setSizeAccumulator(AtomicLong storeSizeAccumulator) { 749 this.totalLength = storeSizeAccumulator; 750 } 751 }