Cassandra In Action - Store

2018/07/02

Cassandra.Store

ColumnFamilyStore

ColumnFamilyStore switchMemtable方法将刷盘动作提交flush到flushExecutor线程池处理。flushExecutor线程池的数量等于memtable_flush_writers的数量。flushExecutor将flush事件提交到perDiskflushExecutors进行处理。flush操作会阻塞知道所有刷盘动作执行完成。

‘createSSTableMultiWriter’方法用于创建SSTableWriter。SSTableWriter内部实现了刷盘操作。

flush方法是由谁、什么时间、什么情况下调用的目前还不清楚,这里后面可以再分析,现在主要分析刷盘时候做了什么事情,如何刷盘的。现在知道memtable是直接插入内存,这个速度会非常的快,那刷盘呢?刷盘是怎么刷的,如何保持高效?

forceFlush将当前未刷盘的memtable刷盘:1.获取当前memtable;2.调用waitForFlushes方法,创建一个ListenableFutureTask<CommitLogPosition>任务,提交到postFlushExecutor线程池执行。

Flush线程的flushMemtable方法执行刷盘操作:调用memtable.flushRunnables方法创建FlushRunnable任务,flushRunnables方法调用memtable.createFlushRunnables方法创建FlushRunnable任务。FlushRunnable实例化SSTableWriter。当任务创建完成后,将任务提交到perDiskflushExecutors线程池中执行。Flush线程首先加锁,然后将memtable标记为正在刷盘,再调用flushMemtable方法执行刷盘。Memtable.Flush线程调用writeSortedContents方法,遍历所有的PartitionPosition将unfilteredIterator添加到writer。

IndexWriter负责写入索引文件(index文件,同时还负责写入BloomFilter、Summary文件),SequentialWriter负责写入数据文件(data文件)

BigTableWriter.append()方法实际写入一行记录:当memtable刷盘时,会把内存中有序的数据追加到BigTableWriter。append()方法首先获取文件的当前位置startPosition = dataFile.position(),然后调用columnIndex.buildRowIndex()方法写入数据文件,最后调用indexWriter.append()写入索引文件。columnIndex.buildRowIndex()方法首先writePartitionHeader()写头部信息,然后add()写入数据文件,最后finish()写入结束符。columnIndex.add()方法首先获取文件的当前位置currentPosition(),然后调用UnfilteredSerializer.serializer.serialize()方法,最终由该方法调用sequentialWriter写入数据。(Cassandra源码分析-存储引擎:http://zqhxuyuan.github.io/2016/10/19/Cassandra-Code-StorageEngine/#BigTableWriter)

BigTableWriter.append()方法,将数据写入文件后,再将数据在文件中的位置写入索引文件。

ColumnFamilyStore.Flush

该任务类用于交换memtable,将已满的memtable置为非活跃只读状态同时创建一个新的活跃memtable用于数据写入。

Memtable

数据先写入memtable再写入文件,现在已经分析了数据如何写入memtable和数据如何写入文件,现在要分析数据何时从memtable写入文件,这是一个关键的节点也许很简单也许要再找找。好像是有TrackerMemtable的生命周期管理类来进行标识是否可刷盘。ColumnFamilyStore.Flush构造函数创建新的memtable并调用Tracker.switchMemtable。明天研究下文件写入的格式!数据的格式是什么样的,如何记录索引。

PartitionPosition

’ private final ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> partitions = new ConcurrentSkipListMap<>();’通过PartitionPosition来索引memtable,可以实现范围查询。

MemtablePool

Keyspace.apply

该方法将数据添加到日志文件,然后更新memtable和index。调用ColumnFamilyStore的apply方法。ColumnFamilyStore最终调用Memtable的put方法。

put方法的流程:根据数据的key获取该key在memtable中的partitionPosition,如果该key在memtable中未找到,则新建一个partitionPosition。put方法还是写内存。

AtomicBTreePartition

‘AtomicBTreePartition.addAllWithSizeDelta’将数据写入memtable。使用自定义BTree来存储数据。该类继承至抽象类’AbstractBTreePartition’,AbstractBTreePartition类持有一个Holder内部类,Hodler类定义了一个Object[],tree数组。所以memtable内部是使用一个btree数组保存数据的。这是它的数据结构。一个BTree保存的内容是什么?(OceanBase的存储结构是类似的,底层也是用btree实现的,可以参考它的实现。btree不支持删除,支持插入和更新,节点可以分裂不能合并。写操作基于读写锁+Copy On Write,读操作不加锁。并发写Btree原理剖析:https://www.cnblogs.com/foxmailed/p/2914625.html .LevelDB的memtable底层是用SkipList实现的。)

如果是用数组来保存数据,那如何检索数据?通过key可以找到PartitionPosition,PartitionPosition关联到对应的AtomicBTreeParition,一个AtomicBTreePartition对应一组btree。

Tracker

Memtable的生命周期管理类来进行标识是否可刷盘,方法switchMemtable调用view.switchMemtable方法将活跃的memtable标识为只读,同时创建新的活跃memtable。

View

lifecycle\View将活跃的memtable标识为immutable,同时创建新的活跃memtable。

rows

因为写入的数据大小是不确定过的,所以这里序列化的时候不能指定写入的大小,所以要记录所有数据的位置,这就需要索引文件。

UnfilteredSerializer

SSTable

SSTable在磁盘的数据存储是有序的,分为数据文件和索引文件。SSTable分成SSTableWriter和SSTableReader,具体操作实现类为:BigTableWriter和BigTableReader。BigTableWriter针对索引文件和数据文件的写入分别是:IndexWriter和SequentialWriter。

Unfiltered

Unfiltered->Row->AbstractRow->BTreeWow表示一行数据。

SequentialWriter

DataOutput->DataOutputPlus->DataOutputStreamPlus->BufferedDataOutputStreamPlus->SequentialWriter,DataOutput接口定义了将Java数据类型转换为字节写入到二进制流方法。接口DataOutputPlus继承了DataOutput,同时扩展了将ByteBuffer和Memory转换为字节写入流方法。抽象类DataOutputStreamPlus实现了DataOutputPlus接口同时继承了OutputStream抽象类,即提供了ByteBuffer和Memory的数据写入同时也提供了数据流的输出。实现类BufferedDataOutputStreamPlus继承DataOutputStreamPlus实现了部分接口,比如write(byte[] b)方法:将字节写入ByteBuffer。该类是线程不安全的。SequentialWriter类继承了BufferedDataOutputStreamPlus同时实现了Transactional事务管理接口。

SequentialWriter在实例化时调用父类构造函数初始化ByteBuffer(ByteBuffer.allocate()默认64K:64 * 1024,堆内内存,默认容量大小10M,超过10M就刷盘,这些数据定义在SequentialWriterOption里面),定义了openChannel打开文件方法,sync刷盘方法将byteBuffer写入文件并强制force刷盘。目前有三处会调用SequentialWriter.sync方法,BufferedDataOutputStreamPlus的所有写入方法在写完数据后都会刷盘! BigTableWriter的openFinalEarly方法:indexFile.sync()和dataFile.sync(),打开文件之前会确认byteBuffer的数据都已刷盘。OnDiskIndexBuilder的finish方法:out.sync()。

SSTableReader

BigTableReader->SSTableReader,SSTableReader.getPosition->BigTableReader.getPosition。首先从缓存从获取key的位置getCachedPosition,如果找到就直接返回,没有找到就继续。读取indexfile,获取数据在datafile中的位置。第一个int为position,第二个int为数据size。如果size=0则直接new RowIndexEntry返回;不为0,继续读取下一个int获取headerLength,columnsIndexCount。总size-headerLength-columnsIndexLength得数据的indexedPartSize。

首先获取key,如果key不相等,会继续循环查找下一个RowIndexEntry.Serializer.skip跳过这些数据到下一个位置in.skipBytesFully(size)->InputStream.skip(n),数据在indexfile中存储的格式key|position|size|data

Post Directory