MIT6.5830-Lab2
MIT6.5830 的数据库课程。Lab2 主要是完成查询的连接和过滤、实现聚合下的查询操作、对表的插入删除以及实现数据库中非常重要的 LRU 页面置换算法。
任务目录
Exercise1:实现查询的连接和过滤
Exercise2:实现聚合操作MIN、MAX、COUNT、SUM、AVG
Exercise3:实现修改表的方法,完成添加和删除
Exercise4:实现Insert和Delete操作符
Exercise5:实现BufferPool的页面置换算法
Exercise1
实现Predicate、JoinPredicate、Filter、Join
Predicate类
Filter的辅助类,用于筛选满足条件的tuple:将tuple中的字段与指定字段进行比较,实现对单个tuple的过滤操作(比较逻辑有==、>=、<=、>、<、!=、LIKE等)。
参数:
指定的比较字段(也是
Field.compare()
的参数):private Field operand;
tuple中指定字段对应字段的索引:
private int fieldIndex;
执行的比较逻辑,运算操作符(也是
Field.compare()
的参数):private Op op;
方法:
构造方法:
public Predicate(int field, Op op, Field operand)
返回tuple中对应字段的索引:
public int getField()
返回运算操作符:
public Op getOp()
返回指定的比较字段:
public Field getOperand()
对元组t进行比较:
public boolean filter(Tuple t)
1
2
3
4
5public boolean filter(Tuple t) {
// TODO: some code goes here
Field otherOperand = t.getField(fieldIndex);
return otherOperand.compare(op, operand); // 例如: otherOperand >= operand
}
IntField和StringField分别实现了Field接口中的compare(op, operand)
方法。
- 返回展示结果:
public String toString()
Filter类
Filter类实现了Operator接口,根据Predicate的判断结果,得到满足结果的tuples,实现了类似于where age > 16
的过滤操作。
参数:
通过Predicate成员变量,实现对每一个tuple的过滤操作:
private Predicate predicate;
待过滤的tuples迭代器,处理输入的元组流,并产生输出的元组流(在 Filter 的上下文中,通常只有一个子操作符):
private OpIterator[] children;
方法:
构造方法:
public Filter(Predicate p, OpIterator child)
1
2
3
4
5
6public Filter(Predicate p, OpIterator child) {
// TODO: some code goes here
this.predicate = p;
this.children = new OpIterator[1];
this.children[0] = child;
}返回谓词predicate:
public Predicate getPredicate()
返回待过滤的tuple属性:
public TupleDesc getTupleDesc()
开启过滤迭代器:
public void open()
1
2
3
4
5
6public void open() throws DbException, NoSuchElementException,
TransactionAbortedException {
// TODO: some code goes here
super.open();
children[0].open();
}
Filter是操作符Operator的子类,需要执行super.open()
方法。
关闭过滤迭代器:
public void close()
每次调用时,返回过滤后的下一个tuple:
protected Tuple fetchNext()
1
2
3
4
5
6
7
8
9
10
11
12protected Tuple fetchNext() throws NoSuchElementException,
TransactionAbortedException, DbException {
// TODO: some code goes here
while (children[0].hasNext()) {
Tuple tuple = children[0].next();
// predicate封装了过滤条件,包括要比较的字段、比较操作符(如等于、大于等)和比较值
if (predicate.filter(tuple)) {
return tuple;
}
}
return null;
}返回待过滤的tuples迭代器:
public OpIterator[] getChildren()
设置待过滤的tuples迭代器:
public void setChildren(OpIterator[] children)
JoinPredicate类
Join的辅助类,与Predicate类似,实现连接的条件,对两个tuple中的某一字段进行比较(类似where t1.age = t2.age
)。
参数:
tuple1进行比较的字段索引:
private int fieldIndex1;
tuple2进行比较的字段索引:
private int fieldIndex2;
执行的比较逻辑:
private Predicate.Op op;
方法:
构造方法:
public JoinPredicate(int field1, Predicate.Op op, int field2)
返回tuple1的字段索引:
public int getField1()
返回tuple2的字段索引:
public int getField2()
返回比较逻辑符:
public Predicate.Op getOperator()
对tuple1的字段和tuple2的字段进行比较:
public boolean filter(Tuple t1, Tuple t2)
1
2
3
4
5
6
7
8
9public boolean filter(Tuple t1, Tuple t2) {
// TODO: some code goes here
if (t1 == null || t2 == null) {
return false;
}
Field field1 = t1.getField(fieldIndex1);
Field field2 = t2.getField(fieldIndex2);
return field1.compare(op, field2);
}
Join类
和Filter类一样都实现了Operator接口,实现连接操作——对于给定两组tuple中满足JoinPredicate操作的两个tuple进行连接,连接方式采取最简单的嵌套循环连接。
参数:
通过JoinPredicate成员变量,实现对两个tuple的连接操作:
private JoinPredicate joinPredicate;
用于连接的tuples迭代器:
private OpIterator[] children;
Left Tuples迭代器:
children[0]
Right Tuples迭代器:
children[1]
fetchNext()
方法每次用嵌套循环选择children[0]中的一个tuple1和children[1]中一个符合条件的tuple2进行连接,记录当前children[0]中的tuple:private Tuple curTuple;
方法:
构造方法:
public Join(JoinPredicate p, OpIterator child1, OpIterator child2)
返回joinPredicate:
public JoinPredicate getJoinPredicate()
返回children[0](左)中进行比较的字段名:
public String getJoinField1Name()
1
2
3
4public String getJoinField1Name() {
// TODO: some code goes here
return children[0].getTupleDesc().getFieldName(joinPredicate.getField1());
}返回children[1](右)中进行比较的字段名:
public String getJoinField2Name()
返回连接后tuple的字段属性,即新表的tupleDesc:
public TupleDesc getTupleDesc()
1
2
3
4public TupleDesc getTupleDesc() {
// TODO: some code goes here
return TupleDesc.merge(children[0].getTupleDesc(), children[1].getTupleDesc());
}
调用TupleDesc类的merge操作完成两个tupleDesc的合并。
迭代器相关开启、关闭、重启
public void open()
public void close()
public void rewind()
返回连接生成的下一个结果:
protected Tuple fetchNext()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27protected Tuple fetchNext() throws TransactionAbortedException, DbException {
// TODO: some code goes here
while (children[0].hasNext() || curTuple != null) {
if (curTuple == null)
curTuple = children[0].next();
Tuple rightTuple;
while(children[1].hasNext()) {
rightTuple = children[1].next();
if (joinPredicate.filter(curTuple, rightTuple)) {
int len1 = curTuple.getTupleDesc().numFields();
int len2 = rightTuple.getTupleDesc().numFields();
Tuple combiTuple = new Tuple(getTupleDesc());
// join后的元组字段数等于左右两个元组字段数之和
for (int i = 0; i < len1; i++) {
combiTuple.setField(i, curTuple.getField(i));
}
for (int i = 0; i < len2; i++) {
combiTuple.setField(i + len1, rightTuple.getField(i));
}
return combiTuple;
}
}
curTuple = null;
children[1].rewind(); // 重置右侧子操作符的迭代器到初始状态
}
return null;
}
采用嵌套循环的连接方式,即两个for循环遍历比较每一对元组:先获取children[0]
中的一个tuple赋值给curTuple
,令curTuple
依次与children[1]
中的所有tuples进行比较,curTuple
与满足连接条件的tuple2进行连接并返回连接后的tuple,当完成children[1]
的一次遍历后,curTuple=children[0].next()
+children[1].rewind()
。
返回用于连接的tuples迭代器:
public OpIterator[] getChildren()
设置用于连接的tuples迭代器:
public void setChildren(OpIterator[] children)
Exercise2
实现IntegerAggregator、StringAggregator、Aggregate类,实现5种SQL聚合(COUNT、SUM、AVG、MIN、MAX),同时支持分组。
IntegerAggerator类
参数:
无分组默认StringField字段:
private static final Field
NO_GROUP_FIELD
= new StringField("NO_GROUP_FIELD", 20);
分组字段的索引:
private int groupByIndex;
分组字段的类型(目前只有int和String):
private Type groupByType;
聚合字段的索引:
private int aggregateIndex;
进行的聚合操作(5种SQL聚合操作):
private Op aggOp;
分组计算的map结果集
- 分组的结果(key–分组的字段,value–封装的结果类对象):
private Map<Field, GroupCalResult> groupCalMap;
辅助类GroupCalResult:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21private static class GroupCalResult {
// 根据不同的聚合操作,可能不会同时用到result和count,当聚合操作不使用该成员时设置默认值-1
public static final Integer DEFAULT_RES = 0;
public static final Integer Deactivate_RES = -1; // result成员变量无效
public static final Integer DEFAULT_COUNT = 0;
public static final Integer Deactivate_COUNT = -1; // count成员变量无效
/**
* 当前分组计算的结果: sum, avg, max, min, count
*/
private Integer result;
/**
* 当前Field出现的频率
*/
private Integer count;
public GroupCalResult(int result , int count) {
this.result = result;
this.count = count;
}
}- 封装分组聚合的结果集,方便迭代器遍历查看结果(key–分组的字段,value–聚合元组tuple):
private Map<Field, Tuple> resultMap;
- 分组的结果(key–分组的字段,value–封装的结果类对象):
聚合后tuple的tupleDesc:
private TupleDesc aggDesc;
方法:
构造方法:
public IntegerAggregator(int gbfield, Type gbfieldtype, int afield, Op what)
根据是否有group分组确定聚合后的表字段属性。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19public IntegerAggregator(int gbfield, Type gbfieldtype, int afield, Op what) {
// TODO: some code goes here
this.groupByIndex = gbfield;
this.groupByType = gbfieldtype;
this.aggregateIndex = afield;
this.aggOp = what;
// init map
this.groupCalMap = new ConcurrentHashMap<>();
this.resultMap = new ConcurrentHashMap<>();
if (groupByIndex >= 0) {
// 有groupBy
this.aggDesc = new TupleDesc(new Type[]{groupByType, Type.INT_TYPE}, new String[]{"groupVal", "aggregateVal"});
} else {
// 无groupBy
this.aggDesc = new TupleDesc(new Type[]{Type.INT_TYPE}, new String[]{"aggregateVal"});
}
}分组聚合操作的执行方法:
public void mergeTupleIntoGroup(Tuple tup)
聚合操作的流程是:先读取一个tuple进行聚合操作,得到一个只聚合了一个tuple的聚合结果,之后每读取一个tuple就将其加入到聚合结果中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19public void mergeTupleIntoGroup(Tuple tup) {
// TODO: some code goes here
Field groupByField = this.groupByIndex == NO_GROUPING ? NO_GROUP_FIELD : tup.getField(groupByIndex);
// 分组字段类型检查
if(!NO_GROUP_FIELD.equals(groupByField) && groupByField.getType() != groupByType) {
throw new IllegalArgumentException("Expected groupByType: 「" + groupByType + "」, but got: " + groupByField.getType());
}
// 聚合字段类型检查
if(!(tup.getField(aggregateIndex) instanceof IntField)) {
throw new IllegalArgumentException("Expected aggType is 「IntField」, but got: " + tup.getField(aggregateIndex).getClass());
}
// 1、store
groupStore(tup, groupByField);
// 2、cal
Tuple curCalTuple = calResult(groupByField);
// 3、update
resultMap.put(groupByField, curCalTuple);
}
分组聚合核心算法:根据不同的聚合操作,GroupCalResult(result, count)封装了分组聚合的结果。例如MIN
操作,GroupCalResult的count成员是无效的,故将其置为*Deactivate_COUNT
。*
计算聚合后的结果,如果存在分组则结果字段属性为“分组字段+结果”,若不存在分组则结果字段属性为“结果”。
返回聚合结果的迭代器:
public OpIterator iterator()
1
2
3
4public OpIterator iterator() {
// TODO: some code goes here
return new IntAggTupIterator();
}聚合结果的迭代器辅助类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36private class IntAggTupIterator implements OpIterator {
private boolean open = false;
private Iterator<Map.Entry<Field, Tuple>> iter;
public void open() {
iter = resultMap.entrySet().iterator();
open = true;
}
public void close() {
open = false;
}
public boolean hasNext() {
return open && iter.hasNext();
}
public Tuple next() {
return iter.next().getValue();
}
public void rewind() {
this.close();
this.open();
}
public TupleDesc getTupleDesc() {
return aggDesc;
}
}
StringAggerator类
对String类型的字段实现分组聚合操作,思路和IntegerAggerator一致,不过对于String类型来说只需要实现COUNT
聚合操作。
参数:
无分组默认StringField字段:
private static final Field
NO_GROUP_FIELD
= new StringField("NO_GROUP_FIELD", 20);
分组字段的索引:
private int groupByIndex;
分组字段的类型:
private Type groupByType;
聚合字段的索引:
private int aggregateIndex;
聚合结果的字段属性:
private TupleDesc aggDesc;
分组计算的map结果集:
分组的结果:
private Map<Field, Integer> groupCalMap;
因为只有
COUNT
聚合操作符,所以不需要使用GroupCalResult辅助类来区分result和count,value直接使用Integer类型即可。private Map<Field, Tuple> resultMap;
方法:
构造方法:
public StringAggregator(int gbfield, Type gbfieldtype, int afield, Op what)
分组聚合操作的执行方法:
public void mergeTupleIntoGroup(Tuple tup)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24public void mergeTupleIntoGroup(Tuple tup) {
// TODO: some code goes here
Field groupField = groupByIndex >= 0 ? tup.getField(groupByIndex) : NO_GROUP_FIELD;
// 分组字段类型检查
if (!NO_GROUP_FIELD.equals(groupField) && groupField.getType() != groupByType) {
throw new IllegalArgumentException("Expected groupByType: 「" + groupByType + "」, but got: " + groupField.getType());
}
// 聚合字段类型检查
if (!(tup.getField(aggregateIndex) instanceof StringField)) {
throw new IllegalArgumentException("Expected aggregateType: 「" + Type.STRING_TYPE + "」, but got: " + tup.getField(aggregateIndex).getType());
}
// 1、store
groupCalMap.put(groupField, this.groupCalMap.getOrDefault(groupField, 0) + 1);
// 2、cal
Tuple curCalTuple = new Tuple(aggDesc);
if (groupByIndex >= 0) {
curCalTuple.setField(0, groupField);
curCalTuple.setField(1, new IntField(groupCalMap.get(groupField)));
} else {
curCalTuple.setField(0, new IntField(groupCalMap.get(groupField)));
}
// 3、update
resultMap.put(groupField, curCalTuple);
}返回聚合结果的迭代器:
public OpIterator iterator()
1
2
3
4public OpIterator iterator() {
// TODO: some code goes here
return new StringAggTupIterator();
}聚合结果的迭代器辅助类(同IntegerAggerator)
Aggerator类
对IntegerAggerator、StringAggerator类进行封装。
参数:
需要聚合的tuples迭代器:
OpIterator[] children;
聚合字段的索引:
private int aggregateIndex;
分组字段的索引:
private int groupByIndex;
聚合操作(5种SQL聚合操作):
private Op aggregateOp;
进行聚合操作的类(IntegerAggregator或StringAggregator):
private Aggregator aggregator;
存放聚合结果的迭代器:
private OpIterator resultIterator;
方法:
构造方法:
public Aggregate(OpIterator child, int afield, int gfield, Aggregator.Op aop)
注意为aggregator聚合操作赋值,判断是IntAggregate还是StringAggregate。
1
2
3
4
5
6
7
8
9
10
11
12
13
14public Aggregate(OpIterator child, int afield, int gfield, Aggregator.Op aop) {
// TODO: some code goes here
this.children = new OpIterator[] {child};
this.aggregateIndex = afield;
this.groupByIndex = gfield;
this.aggregateOp = aop;
Type groupType = gfield != Aggregator.NO_GROUPING ? child.getTupleDesc().getFieldType(gfield) : Type.STRING_TYPE; // 如果没有分组字段,则默认为字符串类型
if (child.getTupleDesc().getFieldType(afield) == Type.INT_TYPE) {
this.aggregator = new IntegerAggregator(gfield, groupType, afield, aop);
} else {
this.aggregator = new StringAggregator(gfield, groupType, afield, aop);
}
}返回分组的字段索引:
public int groupField()
返回分组的字段名:
public String groupFieldName()
返回聚合的字段索引:
public int aggregateField()
返回聚合的字段名:
public String aggregateFieldName()
返回聚合操作符:
public Aggregator.Op aggregateOp()
迭代器相关
聚合操作的核心逻辑代码:
public void open()
将IntAggerator和StringAggerator聚合结果封装在resultIterator迭代器中。
1
2
3
4
5
6
7
8
9
10
11
12public void open() throws NoSuchElementException, DbException,
TransactionAbortedException {
// TODO: some code goes here
super.open();
children[0].open();
while (this.children[0].hasNext()) {
Tuple nextTuple = children[0].next();
aggregator.mergeTupleIntoGroup(nextTuple); // 逐行进行聚合操作
}
resultIterator = aggregator.iterator();
resultIterator.open();
}protected Tuple fetchNext()
1
2
3
4
5
6
7
8protected Tuple fetchNext() throws TransactionAbortedException, DbException {
// TODO: some code goes here
if (resultIterator.hasNext()) {
return resultIterator.next();
} else {
return null;
}
}public void close()
public void rewind()
获取聚合结果的字段属性(
不是原tuples迭代器的字段属性):1
2
3
4public TupleDesc getTupleDesc() {
// TODO: some code goes here
return aggregator.iterator().getTupleDesc();
}
Exercise3
实现HeapPage、HeapFile、BufferPool中的insertTuple、deleteTuple
要实现对数据表数据的增添和删除,先从文件和物理页的层次开始。
Removing Tuples:要移除一个Tuple需要实现deleteTuple,Tuple包含RecordID能够帮助找到tuple存储的物理页,所以思路就是通过RecordID找到对应的物理页,并修改物理页的header。
Adding Tuples:对于文件层面,insertTuple方法负责添加一个tuple到HeapFile中,大致思路是:到页中找一个空闲的slot;如果HeapFile不存在这样的页就需要新建一个页,并添加新页到磁盘上。同时还要保证新添tuple的RecordID正确更新。
在insert和delete中BufferPool、HeapFile、HeapPage的作用:
对于BufferPool和HeapFile来说,是相互调用的关系:HeapFile调用BufferPool的getPage方法获取数据页:在页面存在于BufferPool中时直接从BufferPool获取;不存在时还需调用HeapFile的readPage方法。
1 | public Page getPage(TransactionId tid, PageId pid, Permissions perm) |
HeapPage类
参数:
脏页标志位:
private boolean dirtyFlag;
产生脏页的事务id:
private TransactionId dirtyTid;
方法:
选择一个空slot插入tuple,同时修改该slot对应header的bitmap,表示该slot已被数据占用:
public void insertTuple(Tuple t)
插入数据的三个要点:① 设置slot的bitmap; ② 设置tuple在磁盘的位置recordID; ③ 向页面中插入真实数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public void insertTuple(Tuple t) throws DbException {
// TODO: some code goes here
// not necessary for lab1
if (getNumUnusedSlots() == 0 || !t.getTupleDesc().equals(td)) {
throw new DbException("page is full or tupleDesc is mismatch.");
}
for (int i = 0; i < numSlots; i++) {
if (!isSlotUsed(i)) {
markSlotUsed(i, true); // 设置slot的bitmap
t.setRecordId(new RecordId(pid, i)); // 设置recordID
tuples[i] = t; // 向页面插入数据
return;
}
}
}删除指定的tuple,同时修改slot对应的bitmap,表示该slot已为空:
public void deleteTuple(Tuple t)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18public void deleteTuple(Tuple t) throws DbException {
// TODO: some code goes here
int tid = t.getRecordId().getTupleNumber();
boolean flag = false;
for (int i = 0; i < tuples.length; i++) {
if (t.equals(tuples[i])) {
if (!isSlotUsed(i)) {
throw new DbException("tuple slot is already empty");
}
markSlotUsed(i, false);
tuples[tid] = null; // delete
flag = true;
}
}
if (!flag) {
throw new DbException("the tuple is not on this page");
}
}修改脏页标志位:
public void markDirty()
如果该page是脏页则返回产生脏页的事务id:
public TransactionId isDirty()
修改page中的header,value为true表示在第i位设置为1,value为false表示在第i位设置为0:
private void markSlotUsed(int i, boolean value)
1
2
3
4
5
6
7
8
9
10
11
12
13
14private void markSlotUsed(int i, boolean value) {
// TODO: some code goes here
// not necessary for lab1
int iTh = i / 8;
int bitTh = i % 8;
int onBit = (header[iTh] >> bitTh) & 1; // 判断该slot位是否被使用
if (onBit == 0 && value) {
// 未使用,现在使用
header[iTh] += (byte) (1 << bitTh);
} else if (onBit == 1 && !value) {
// 已使用,现在未使用
header[iTh] -= (byte) (1 << bitTh);
}
}
HeapFile类
方法:
将page写入磁盘,同时清除脏页标志:
public void writePage(Page page)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public void writePage(Page page) throws IOException {
// TODO: some code goes here
// not necessary for lab1
PageId pageId = page.getId();
int pageNo = pageId.getPageNumber();
int offset = pageNo * BufferPool.getPageSize();
byte[] pageData = page.getPageData();
RandomAccessFile file = new RandomAccessFile(f, "rw");
file.seek(offset);
file.write(pageData);
file.close();
// 清除脏页
page.markDirty(false, null);
}将tuple插入到HeapFile的page中,如果HeapFile的page都满了还要在HeapFile中创建一个新的page:
public List<Page> insertTuple(TransactionId tid, Tuple t)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28public List<Page> insertTuple(TransactionId tid, Tuple t)
throws DbException, IOException, TransactionAbortedException {
// TODO: some code goes here
// not necessary for lab1
ArrayList<Page> pageList = new ArrayList<>();
for (int i = 0; i < numPages(); i++) {
HeapPage p = (HeapPage) Database.getBufferPool().getPage(tid, new HeapPageId(getId(), i),
Permissions.READ_WRITE);
if (p.getNumUnusedSlots() == 0) {
// 当前页面没有空闲槽位
continue;
}
p.insertTuple(t); // 调用HeapPage的insertTuple方法
pageList.add(p);
return pageList;
}
// 没有找到合适的页面,需要创建新的页面
BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(f, true));
byte[] emptyData = HeapPage.createEmptyPageData();
bos.write(emptyData);
bos.close();
// 加载入BufferPool
HeapPage p = (HeapPage) Database.getBufferPool().getPage(tid, new HeapPageId(getId(), numPages()-1),
Permissions.READ_WRITE); // pgNo: numPages()方法的返回值是基于文件当前的大小动态计算的
p.insertTuple(t);
pageList.add(p);
return pageList;
}创建新page时将
pageNo
设置为numPages()-1
而不是numPages()
是因为numPages()
方法的返回值是基于文件当前的大小动态计算的,默认新page的pageNo是最后一个。BufferedInputStream和BufferedOutputStream类就是实现缓冲功能的输入/输出流,使用带缓冲的输入/输出流,效率更高、速度更快。
使用步骤:①创建FileOutputStream对象,构造方法中绑定要输出的目标文件; ②创建BufferOutputStream对象,使用write方法将数据写入内部缓冲区; ③使用BufferOutputStream的flush方法将缓冲区数据刷新到文件; ④释放资源。
1
2
3BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream("bos.txt")) ;
bos.write("hello".getBytes()); //写数据
bos.close(); //释放资源将HeapFile中某一个page上的tuple从该page中删除:
public List<Page> deleteTuple(TransactionId tid, Tuple t)
1
2
3
4
5
6
7
8public List<Page> deleteTuple(TransactionId tid, Tuple t) throws DbException,
TransactionAbortedException {
// TODO: some code goes here
// not necessary for lab1
HeapPage p = (HeapPage) Database.getBufferPool().getPage(tid, t.getRecordId().getPageId(), Permissions.READ_WRITE);
p.deleteTuple(t);
return Collections.singletonList(p);
}
根据tuple的RecordID属性也可以通过getPageId找到tuple所在的page页。
可以自行对比一下lab1和lab2中方法实现的区别:
BufferPool类
insert和delete这里BufferPool涉及到页面置换策略,其实也就是LRU策略。所以这部分实现放到Exercise5中一起来做。
Exercise4
实现Insert类、Delete类,对exercise3的功能进行封装
实现Insert和Delete操作符,用于修改磁盘上的页数据,这些操作符返回受影响的tuples个数(受影响的行数)。
Insert类
参数:
执行插入操作的事务id:
private TransactionId tid;
待插入的tuples的迭代器:
private OpIterator[] children;
待插入的表id:
private int tableId;
fetchNext方法会返回一个标识插入了多少行的tuple结果,tupleDesc就是该tuple的属性行:
private TupleDesc tupleDesc;
1
this.tupleDesc = new TupleDesc(new Type[]{Type.INT_TYPE}, new String[]{"insertNums"});
插入受影响行数的tuple结果(最主要的作用是避免fetchNext方法无限制的向下取,保证只调用一次):
private Tuple insertRes;
方法:
构造方法:
public Insert(TransactionId t, OpIterator child, int tableId)
执行插入操作,返回包含插入受影响行数的tuple:
protected Tuple fetchNext()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20protected Tuple fetchNext() throws TransactionAbortedException, DbException {
// TODO: some code goes here
// 保证只调用一次,多次调用返回null
if (insertRes != null) {
return null;
}
int insertNums = 0;
while (children[0].hasNext()) {
try {
Database.getBufferPool().insertTuple(tid, tableId, children[0].next());
insertNums++;
} catch (IOException e) {
System.out.println("Insert tuples into database failed!");
throw new RuntimeException(e);
}
}
insertRes = new Tuple(tupleDesc); // 计算插入操作影响的行数
insertRes.setField(0, new IntField(insertNums));
return insertRes;
}
Delete类
封装类的实现和Insert类完全相同。
1 | public void deleteTuple(Tuple t) throws DbException { |
Exercise5
Eviction Policy有很多,可以自己选择,我选择的是最常见的LRU策略。
Lab1中在BufferPool满后会抛出异常,这里实现了BufferPool的页面置换策略,采取的是LRU的策略(详见LeetCode中的实现 https://leetcode.cn/problems/lru-cache/description/?envType=study-plan-v2\&envId=top-100-liked )。
BufferPool类
原先采用的是HashMap保存pageId和page的映射:private final Map<PageId, Page> bufferPools = new ConcurrentHashMap<>();
,加入页面置换策略后改用自定义的LRUCache类(维护一个双向链表)来存储pageId和page的映射关系。
同时还涉及exercise3中BufferPool的插入、删除操作。
参数:
LRUCache类:
private static class LRUCache
为了实现LRU算法,需要维护一个双向链表LRUCache,用于记录每个PageId的访问顺序。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101/**
* 为了实现LRU算法,需要维护一个双向链表,用于记录每个PageId的访问顺序
*/
private static class LRUCache {
int capacity, size;
ConcurrentHashMap<PageId, Node> map;
// 头节点和尾节点:标志位无数据
Node head = new Node();
Node tail = new Node();
public LRUCache(int capacity) {
this.capacity = capacity;
this.size = 0;
map = new ConcurrentHashMap<>();
head.next = tail;
tail.prev = head;
}
public synchronized Page get(PageId key) {
if (map.containsKey(key)) {
Node node = map.get(key);
// 定位到链表头
moveToHead(node);
return node.val;
} else {
return null;
}
}
public synchronized void put(PageId key, Page val) {
if (map.containsKey(key)) {
// 更新value
Node node = map.get(key);
node.val = val;
moveToHead(node);
} else {
Node newNode = new Node(key, val);
map.put(key, newNode);
// 添加到链表头
addToHead(newNode);
size++;
if (size > capacity) {
// 移除链表尾
Node node = removeTail();
map.remove(node.key);
size--;
}
}
}
// 添加到链表头部
private synchronized void addToHead(Node node) {
node.prev = head;
node.next = head.next;
head.next.prev = node;
head.next = node;
}
// 移动到链表头部
private synchronized void moveToHead(Node node) {
// 先从原位置删除node
removeNode(node);
// 再将node插入链表头部
addToHead(node);
}
// 删除node节点
private void removeNode(Node node) {
node.prev.next = node.next;
node.next.prev = node.prev;
}
// 删除尾部节点
private Node removeTail() {
Node node = tail.prev;
removeNode(node);
return node;
}
private synchronized int getSize() {
return size;
}
private static class Node {
PageId key;
Page val;
Node prev;
Node next;
public Node() {}
public Node(PageId key, Page val) {
this.key = key;
this.val = val;
}
}
public Set<Map.Entry<PageId, Node>> getEntrySet() {
return map.entrySet();
}
}
方法:
调用HeapFile的
insertTuple
方法插入元组,将返回的结果保存到BufferPool:public void insertTuple(TransactionId tid, int tableId, Tuple t)
1
2
3
4
5
6
7
8public void insertTuple(TransactionId tid, int tableId, Tuple t)
throws DbException, IOException, TransactionAbortedException {
// TODO: some code goes here
// not necessary for lab1
DbFile f = Database.getCatalog().getDatabaseFile(tableId);
List<Page> updatePages = f.insertTuple(tid, t);
updateBufferPool(updatePages, tid);
}调用HeapFile的
deleteTuple
方法删除元组,将结果保存到BufferPool:public void deleteTuple(TransactionId tid, Tuple t)
1
2
3
4
5
6
7
8public void deleteTuple(TransactionId tid, Tuple t)
throws DbException, IOException, TransactionAbortedException {
// TODO: some code goes here
// not necessary for lab1
DbFile f = Database.getCatalog().getDatabaseFile(t.getRecordId().getPageId().getTableId());
List<Page> updatePages = f.deleteTuple(tid, t);
updateBufferPool(updatePages, tid);
}更新BufferPool的存储:
public void updateBufferPool(List<Page> updatePages, TransactionId id)
注意将页面设置为脏页,因为在BufferPool中修改page后,和磁盘中的page不一致了。
1
2
3
4
5
6
7public void updateBufferPool(List<Page> updatePages, TransactionId id) {
for (Page page: updatePages) {
page.markDirty(true, id); // 设置为脏页(因为在BufferPool中修改page后,和磁盘中的page不一致了)
// update BufferPool
lruCache.put(page.getId(), page);
}
}从BufferPool中移除页面:
public synchronized void removePage (PageId pid)
1
2
3
4
5
6
7public synchronized void removePage(PageId pid) {
// TODO: some code goes here
// not necessary for lab1
if (pid != null) {
lruCache.map.remove(pid);
}
}将某个页面如果是脏页则刷新到磁盘:
private synchronized void flushPage(PageId pid)
1
2
3
4
5
6
7
8
9
10
11
12
13
14private synchronized void flushPage(PageId pid) throws IOException {
// TODO: some code goes here
// not necessary for lab1
Page page = lruCache.get(pid);
if (page == null) {
return;
}
TransactionId tid = page.isDirty();
if (tid != null) {
Page before = page.getBeforeImage();
Database.getLogFile().logWrite(tid, before, page);
Database.getCatalog().getDatabaseFile(pid.getTableId()).writePage(page);
}
}将所有脏页刷新到磁盘:
public synchronized void flushAllPages()
1
2
3
4
5
6
7
8
9
10public synchronized void flushAllPages() throws IOException {
// TODO: some code goes here
// not necessary for lab1
for (Map.Entry<PageId, LRUCache.Node> group: lruCache.getEntrySet()) {
Page page = group.getValue().val;
if (page.isDirty() != null) {
flushPage(group.getKey()); // 将不是脏页的页面刷新页面到磁盘
}
}
}
要判断要删除的目标元组的字段是否和提供的相等,而不是地址相等: