MIT6.5830 的数据库课程。Lab2 主要是完成查询的连接和过滤、实现聚合下的查询操作、对表的插入删除以及实现数据库中非常重要的 LRU 页面置换算法。

任务目录

  • Exercise1:实现查询的连接和过滤

  • Exercise2:实现聚合操作MIN、MAX、COUNT、SUM、AVG

  • Exercise3:实现修改表的方法,完成添加和删除

  • Exercise4:实现Insert和Delete操作符

  • Exercise5:实现BufferPool的页面置换算法

Exercise1

实现Predicate、JoinPredicate、Filter、Join

Predicate类

Filter的辅助类,用于筛选满足条件的tuple:将tuple中的字段与指定字段进行比较,实现对单个tuple的过滤操作(比较逻辑有==、>=、<=、>、<、!=、LIKE等)。

参数:

  • 指定的比较字段(也是Field.compare()的参数):private Field operand;

  • tuple中指定字段对应字段的索引:private int fieldIndex;

  • 执行的比较逻辑,运算操作符(也是Field.compare()的参数):private Op op;

方法:

  • 构造方法:public Predicate(int field, Op op, Field operand)

  • 返回tuple中对应字段的索引:public int getField()

  • 返回运算操作符:public Op getOp()

  • 返回指定的比较字段:public Field getOperand()

  • 对元组t进行比较:public boolean filter(Tuple t)

    1
    2
    3
    4
    5
    public boolean filter(Tuple t) {
    // TODO: some code goes here
    Field otherOperand = t.getField(fieldIndex);
    return otherOperand.compare(op, operand); // 例如: otherOperand >= operand
    }

IntField和StringField分别实现了Field接口中的compare(op, operand)方法。

  • 返回展示结果:public String toString()

Filter类

Filter类实现了Operator接口,根据Predicate的判断结果,得到满足结果的tuples,实现了类似于where age > 16的过滤操作。

参数:

  • 通过Predicate成员变量,实现对每一个tuple的过滤操作:private Predicate predicate;

  • 待过滤的tuples迭代器,处理输入的元组流,并产生输出的元组流(在 Filter 的上下文中,通常只有一个子操作符):private OpIterator[] children;

方法:

  • 构造方法:public Filter(Predicate p, OpIterator child)

    1
    2
    3
    4
    5
    6
    public Filter(Predicate p, OpIterator child) {
    // TODO: some code goes here
    this.predicate = p;
    this.children = new OpIterator[1];
    this.children[0] = child;
    }
  • 返回谓词predicate:public Predicate getPredicate()

  • 返回待过滤的tuple属性:public TupleDesc getTupleDesc()

  • 开启过滤迭代器:public void open()

    1
    2
    3
    4
    5
    6
    public void open() throws DbException, NoSuchElementException,
    TransactionAbortedException {
    // TODO: some code goes here
    super.open();
    children[0].open();
    }

Filter是操作符Operator的子类,需要执行super.open()方法。

  • 关闭过滤迭代器:public void close()

  • 每次调用时,返回过滤后的下一个tuple:protected Tuple fetchNext()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    protected Tuple fetchNext() throws NoSuchElementException,
    TransactionAbortedException, DbException {
    // TODO: some code goes here
    while (children[0].hasNext()) {
    Tuple tuple = children[0].next();
    // predicate封装了过滤条件,包括要比较的字段、比较操作符(如等于、大于等)和比较值
    if (predicate.filter(tuple)) {
    return tuple;
    }
    }
    return null;
    }
  • 返回待过滤的tuples迭代器:public OpIterator[] getChildren()

  • 设置待过滤的tuples迭代器:public void setChildren(OpIterator[] children)

JoinPredicate类

Join的辅助类,与Predicate类似,实现连接的条件,对两个tuple中的某一字段进行比较(类似where t1.age = t2.age)。

参数:

  • tuple1进行比较的字段索引:private int fieldIndex1;

  • tuple2进行比较的字段索引:private int fieldIndex2;

  • 执行的比较逻辑:private Predicate.Op op;

方法:

  • 构造方法:public JoinPredicate(int field1, Predicate.Op op, int field2)

  • 返回tuple1的字段索引:public int getField1()

  • 返回tuple2的字段索引:public int getField2()

  • 返回比较逻辑符:public Predicate.Op getOperator()

  • 对tuple1的字段和tuple2的字段进行比较:public boolean filter(Tuple t1, Tuple t2)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public boolean filter(Tuple t1, Tuple t2) {
    // TODO: some code goes here
    if (t1 == null || t2 == null) {
    return false;
    }
    Field field1 = t1.getField(fieldIndex1);
    Field field2 = t2.getField(fieldIndex2);
    return field1.compare(op, field2);
    }

Join类

和Filter类一样都实现了Operator接口,实现连接操作——对于给定两组tuple中满足JoinPredicate操作的两个tuple进行连接,连接方式采取最简单的嵌套循环连接

参数:

  • 通过JoinPredicate成员变量,实现对两个tuple的连接操作:private JoinPredicate joinPredicate;

  • 用于连接的tuples迭代器:private OpIterator[] children;

    • Left Tuples迭代器: children[0]

    • Right Tuples迭代器: children[1]

  • fetchNext()方法每次用嵌套循环选择children[0]中的一个tuple1和children[1]中一个符合条件的tuple2进行连接,记录当前children[0]中的tuple:private Tuple curTuple;

方法:

  • 构造方法:public Join(JoinPredicate p, OpIterator child1, OpIterator child2)

  • 返回joinPredicate:public JoinPredicate getJoinPredicate()

  • 返回children[0](左)中进行比较的字段名:public String getJoinField1Name()

    1
    2
    3
    4
    public String getJoinField1Name() {
    // TODO: some code goes here
    return children[0].getTupleDesc().getFieldName(joinPredicate.getField1());
    }
  • 返回children[1](右)中进行比较的字段名:public String getJoinField2Name()

  • 返回连接后tuple的字段属性,即新表的tupleDesc:public TupleDesc getTupleDesc()

    1
    2
    3
    4
    public TupleDesc getTupleDesc() {
    // TODO: some code goes here
    return TupleDesc.merge(children[0].getTupleDesc(), children[1].getTupleDesc());
    }

调用TupleDesc类的merge操作完成两个tupleDesc的合并。

  • 迭代器相关开启、关闭、重启

    • public void open()

    • public void close()

    • public void rewind()

  • 返回连接生成的下一个结果:protected Tuple fetchNext()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    protected Tuple fetchNext() throws TransactionAbortedException, DbException {
    // TODO: some code goes here
    while (children[0].hasNext() || curTuple != null) {
    if (curTuple == null)
    curTuple = children[0].next();
    Tuple rightTuple;
    while(children[1].hasNext()) {
    rightTuple = children[1].next();
    if (joinPredicate.filter(curTuple, rightTuple)) {
    int len1 = curTuple.getTupleDesc().numFields();
    int len2 = rightTuple.getTupleDesc().numFields();
    Tuple combiTuple = new Tuple(getTupleDesc());
    // join后的元组字段数等于左右两个元组字段数之和
    for (int i = 0; i < len1; i++) {
    combiTuple.setField(i, curTuple.getField(i));
    }
    for (int i = 0; i < len2; i++) {
    combiTuple.setField(i + len1, rightTuple.getField(i));
    }
    return combiTuple;
    }
    }
    curTuple = null;
    children[1].rewind(); // 重置右侧子操作符的迭代器到初始状态
    }
    return null;
    }

采用嵌套循环的连接方式,即两个for循环遍历比较每一对元组:先获取children[0]中的一个tuple赋值给curTuple,令curTuple依次与children[1]中的所有tuples进行比较,curTuple与满足连接条件的tuple2进行连接并返回连接后的tuple,当完成children[1]的一次遍历后,curTuple=children[0].next() +children[1].rewind()

  • 返回用于连接的tuples迭代器:public OpIterator[] getChildren()

  • 设置用于连接的tuples迭代器:public void setChildren(OpIterator[] children)

Exercise2

实现IntegerAggregator、StringAggregator、Aggregate类,实现5种SQL聚合(COUNT、SUM、AVG、MIN、MAX),同时支持分组。

IntegerAggerator类

参数:

  • 无分组默认StringField字段:private static final Field NO_GROUP_FIELD = new StringField("NO_GROUP_FIELD", 20);

  • 分组字段的索引:private int groupByIndex;

  • 分组字段的类型(目前只有int和String):private Type groupByType;

  • 聚合字段的索引:private int aggregateIndex;

  • 进行的聚合操作(5种SQL聚合操作):private Op aggOp;

  • 分组计算的map结果集

    • 分组的结果(key–分组的字段,value–封装的结果类对象):private Map<Field, GroupCalResult> groupCalMap;

    辅助类GroupCalResult:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    private static class GroupCalResult {
    // 根据不同的聚合操作,可能不会同时用到result和count,当聚合操作不使用该成员时设置默认值-1
    public static final Integer DEFAULT_RES = 0;
    public static final Integer Deactivate_RES = -1; // result成员变量无效
    public static final Integer DEFAULT_COUNT = 0;
    public static final Integer Deactivate_COUNT = -1; // count成员变量无效
    /**
    * 当前分组计算的结果: sum, avg, max, min, count
    */
    private Integer result;

    /**
    * 当前Field出现的频率
    */
    private Integer count;

    public GroupCalResult(int result , int count) {
    this.result = result;
    this.count = count;
    }
    }
    • 封装分组聚合的结果集,方便迭代器遍历查看结果(key–分组的字段,value–聚合元组tuple):private Map<Field, Tuple> resultMap;
  • 聚合后tuple的tupleDesc:private TupleDesc aggDesc;

方法:

  • 构造方法:public IntegerAggregator(int gbfield, Type gbfieldtype, int afield, Op what)

    根据是否有group分组确定聚合后的表字段属性。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    public IntegerAggregator(int gbfield, Type gbfieldtype, int afield, Op what) {
    // TODO: some code goes here
    this.groupByIndex = gbfield;
    this.groupByType = gbfieldtype;
    this.aggregateIndex = afield;
    this.aggOp = what;

    // init map
    this.groupCalMap = new ConcurrentHashMap<>();
    this.resultMap = new ConcurrentHashMap<>();

    if (groupByIndex >= 0) {
    // 有groupBy
    this.aggDesc = new TupleDesc(new Type[]{groupByType, Type.INT_TYPE}, new String[]{"groupVal", "aggregateVal"});
    } else {
    // 无groupBy
    this.aggDesc = new TupleDesc(new Type[]{Type.INT_TYPE}, new String[]{"aggregateVal"});
    }
    }
  • 分组聚合操作的执行方法:public void mergeTupleIntoGroup(Tuple tup)

    聚合操作的流程是:先读取一个tuple进行聚合操作,得到一个只聚合了一个tuple的聚合结果,之后每读取一个tuple就将其加入到聚合结果中。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    public void mergeTupleIntoGroup(Tuple tup) {
    // TODO: some code goes here
    Field groupByField = this.groupByIndex == NO_GROUPING ? NO_GROUP_FIELD : tup.getField(groupByIndex);
    // 分组字段类型检查
    if(!NO_GROUP_FIELD.equals(groupByField) && groupByField.getType() != groupByType) {
    throw new IllegalArgumentException("Expected groupByType: 「" + groupByType + "」, but got: " + groupByField.getType());
    }
    // 聚合字段类型检查
    if(!(tup.getField(aggregateIndex) instanceof IntField)) {
    throw new IllegalArgumentException("Expected aggType is 「IntField」, but got: " + tup.getField(aggregateIndex).getClass());
    }

    // 1、store
    groupStore(tup, groupByField);
    // 2、cal
    Tuple curCalTuple = calResult(groupByField);
    // 3、update
    resultMap.put(groupByField, curCalTuple);
    }

分组聚合核心算法:根据不同的聚合操作,GroupCalResult(result, count)封装了分组聚合的结果。例如MIN操作,GroupCalResult的count成员是无效的,故将其置为*Deactivate_COUNT。*

计算聚合后的结果,如果存在分组则结果字段属性为“分组字段+结果”,若不存在分组则结果字段属性为“结果”。

  • 返回聚合结果的迭代器:public OpIterator iterator()

    1
    2
    3
    4
    public OpIterator iterator() {
    // TODO: some code goes here
    return new IntAggTupIterator();
    }

    聚合结果的迭代器辅助类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    private class IntAggTupIterator implements OpIterator {
    private boolean open = false;
    private Iterator<Map.Entry<Field, Tuple>> iter;

    @Override
    public void open() {
    iter = resultMap.entrySet().iterator();
    open = true;
    }

    @Override
    public void close() {
    open = false;
    }

    @Override
    public boolean hasNext() {
    return open && iter.hasNext();
    }

    @Override
    public Tuple next() {
    return iter.next().getValue();
    }

    @Override
    public void rewind() {
    this.close();
    this.open();
    }

    @Override
    public TupleDesc getTupleDesc() {
    return aggDesc;
    }
    }

StringAggerator类

对String类型的字段实现分组聚合操作,思路和IntegerAggerator一致,不过对于String类型来说只需要实现COUNT聚合操作。

参数:

  • 无分组默认StringField字段:private static final Field NO_GROUP_FIELD = new StringField("NO_GROUP_FIELD", 20);

  • 分组字段的索引:private int groupByIndex;

  • 分组字段的类型:private Type groupByType;

  • 聚合字段的索引:private int aggregateIndex;

  • 聚合结果的字段属性:private TupleDesc aggDesc;

  • 分组计算的map结果集:

    • 分组的结果:private Map<Field, Integer> groupCalMap;

      因为只有COUNT聚合操作符,所以不需要使用GroupCalResult辅助类来区分result和count,value直接使用Integer类型即可。

    • private Map<Field, Tuple> resultMap;

方法:

  • 构造方法:public StringAggregator(int gbfield, Type gbfieldtype, int afield, Op what)

  • 分组聚合操作的执行方法:public void mergeTupleIntoGroup(Tuple tup)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    public void mergeTupleIntoGroup(Tuple tup) {
    // TODO: some code goes here
    Field groupField = groupByIndex >= 0 ? tup.getField(groupByIndex) : NO_GROUP_FIELD;
    // 分组字段类型检查
    if (!NO_GROUP_FIELD.equals(groupField) && groupField.getType() != groupByType) {
    throw new IllegalArgumentException("Expected groupByType: 「" + groupByType + "」, but got: " + groupField.getType());
    }
    // 聚合字段类型检查
    if (!(tup.getField(aggregateIndex) instanceof StringField)) {
    throw new IllegalArgumentException("Expected aggregateType: 「" + Type.STRING_TYPE + "」, but got: " + tup.getField(aggregateIndex).getType());
    }
    // 1、store
    groupCalMap.put(groupField, this.groupCalMap.getOrDefault(groupField, 0) + 1);
    // 2、cal
    Tuple curCalTuple = new Tuple(aggDesc);
    if (groupByIndex >= 0) {
    curCalTuple.setField(0, groupField);
    curCalTuple.setField(1, new IntField(groupCalMap.get(groupField)));
    } else {
    curCalTuple.setField(0, new IntField(groupCalMap.get(groupField)));
    }
    // 3、update
    resultMap.put(groupField, curCalTuple);
    }
  • 返回聚合结果的迭代器:public OpIterator iterator()

    1
    2
    3
    4
    public OpIterator iterator() {
    // TODO: some code goes here
    return new StringAggTupIterator();
    }

    聚合结果的迭代器辅助类(同IntegerAggerator)

Aggerator类

对IntegerAggerator、StringAggerator类进行封装。

参数:

  • 需要聚合的tuples迭代器:OpIterator[] children;

  • 聚合字段的索引:private int aggregateIndex;

  • 分组字段的索引:private int groupByIndex;

  • 聚合操作(5种SQL聚合操作):private Op aggregateOp;

  • 进行聚合操作的类(IntegerAggregator或StringAggregator):private Aggregator aggregator;

  • 存放聚合结果的迭代器:private OpIterator resultIterator;

方法:

  • 构造方法:public Aggregate(OpIterator child, int afield, int gfield, Aggregator.Op aop)

    注意为aggregator聚合操作赋值,判断是IntAggregate还是StringAggregate。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public Aggregate(OpIterator child, int afield, int gfield, Aggregator.Op aop) {
    // TODO: some code goes here
    this.children = new OpIterator[] {child};
    this.aggregateIndex = afield;
    this.groupByIndex = gfield;
    this.aggregateOp = aop;

    Type groupType = gfield != Aggregator.NO_GROUPING ? child.getTupleDesc().getFieldType(gfield) : Type.STRING_TYPE; // 如果没有分组字段,则默认为字符串类型
    if (child.getTupleDesc().getFieldType(afield) == Type.INT_TYPE) {
    this.aggregator = new IntegerAggregator(gfield, groupType, afield, aop);
    } else {
    this.aggregator = new StringAggregator(gfield, groupType, afield, aop);
    }
    }
  • 返回分组的字段索引:public int groupField()

  • 返回分组的字段名:public String groupFieldName()

  • 返回聚合的字段索引:public int aggregateField()

  • 返回聚合的字段名:public String aggregateFieldName()

  • 返回聚合操作符:public Aggregator.Op aggregateOp()

  • 迭代器相关

    • 聚合操作的核心逻辑代码:public void open()

      将IntAggerator和StringAggerator聚合结果封装在resultIterator迭代器中。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public void open() throws NoSuchElementException, DbException,
    TransactionAbortedException {
    // TODO: some code goes here
    super.open();
    children[0].open();
    while (this.children[0].hasNext()) {
    Tuple nextTuple = children[0].next();
    aggregator.mergeTupleIntoGroup(nextTuple); // 逐行进行聚合操作
    }
    resultIterator = aggregator.iterator();
    resultIterator.open();
    }
    • protected Tuple fetchNext()
    1
    2
    3
    4
    5
    6
    7
    8
    protected Tuple fetchNext() throws TransactionAbortedException, DbException {
    // TODO: some code goes here
    if (resultIterator.hasNext()) {
    return resultIterator.next();
    } else {
    return null;
    }
    }
    • public void close()

    • public void rewind()

  • 获取聚合结果的字段属性(不是原tuples迭代器的字段属性):

    1
    2
    3
    4
    public TupleDesc getTupleDesc() {
    // TODO: some code goes here
    return aggregator.iterator().getTupleDesc();
    }

Exercise3

实现HeapPage、HeapFile、BufferPool中的insertTuple、deleteTuple

要实现对数据表数据的增添和删除,先从文件和物理页的层次开始。

Removing Tuples:要移除一个Tuple需要实现deleteTuple,Tuple包含RecordID能够帮助找到tuple存储的物理页,所以思路就是通过RecordID找到对应的物理页,并修改物理页的header。

Adding Tuples:对于文件层面,insertTuple方法负责添加一个tuple到HeapFile中,大致思路是:到页中找一个空闲的slot;如果HeapFile不存在这样的页就需要新建一个页,并添加新页到磁盘上。同时还要保证新添tuple的RecordID正确更新。

在insert和delete中BufferPool、HeapFile、HeapPage的作用:

对于BufferPool和HeapFile来说,是相互调用的关系:HeapFile调用BufferPool的getPage方法获取数据页:在页面存在于BufferPool中时直接从BufferPool获取;不存在时还需调用HeapFile的readPage方法。

1
2
3
4
5
6
7
8
9
public Page getPage(TransactionId tid, PageId pid, Permissions perm)
throws TransactionAbortedException, DbException {
if(!bufferPools.containsKey(pid)) {
DbFile file = Database.getCatalog().getDatabaseFile(pid.getTableId());
Page page = file.readPage(pid); // 调用HeapFile的readPage方法
bufferPools.put(pid, page);
}
return bufferPools.get(pid);
}

HeapPage类

参数:

  • 脏页标志位:private boolean dirtyFlag;

  • 产生脏页的事务id:private TransactionId dirtyTid;

方法:

  • 选择一个空slot插入tuple,同时修改该slot对应header的bitmap,表示该slot已被数据占用:public void insertTuple(Tuple t)

    插入数据的三个要点:① 设置slot的bitmap; ② 设置tuple在磁盘的位置recordID; ③ 向页面中插入真实数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public void insertTuple(Tuple t) throws DbException {
    // TODO: some code goes here
    // not necessary for lab1
    if (getNumUnusedSlots() == 0 || !t.getTupleDesc().equals(td)) {
    throw new DbException("page is full or tupleDesc is mismatch.");
    }
    for (int i = 0; i < numSlots; i++) {
    if (!isSlotUsed(i)) {
    markSlotUsed(i, true); // 设置slot的bitmap
    t.setRecordId(new RecordId(pid, i)); // 设置recordID
    tuples[i] = t; // 向页面插入数据
    return;
    }
    }
    }
  • 删除指定的tuple,同时修改slot对应的bitmap,表示该slot已为空:public void deleteTuple(Tuple t)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public void deleteTuple(Tuple t) throws DbException {
    // TODO: some code goes here
    int tid = t.getRecordId().getTupleNumber();
    boolean flag = false;
    for (int i = 0; i < tuples.length; i++) {
    if (t.equals(tuples[i])) {
    if (!isSlotUsed(i)) {
    throw new DbException("tuple slot is already empty");
    }
    markSlotUsed(i, false);
    tuples[tid] = null; // delete
    flag = true;
    }
    }
    if (!flag) {
    throw new DbException("the tuple is not on this page");
    }
    }
  • 修改脏页标志位:public void markDirty()

  • 如果该page是脏页则返回产生脏页的事务id:public TransactionId isDirty()

  • 修改page中的header,value为true表示在第i位设置为1,value为false表示在第i位设置为0:private void markSlotUsed(int i, boolean value)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    private void markSlotUsed(int i, boolean value) {
    // TODO: some code goes here
    // not necessary for lab1
    int iTh = i / 8;
    int bitTh = i % 8;
    int onBit = (header[iTh] >> bitTh) & 1; // 判断该slot位是否被使用
    if (onBit == 0 && value) {
    // 未使用,现在使用
    header[iTh] += (byte) (1 << bitTh);
    } else if (onBit == 1 && !value) {
    // 已使用,现在未使用
    header[iTh] -= (byte) (1 << bitTh);
    }
    }

HeapFile类

方法:

  • 将page写入磁盘,同时清除脏页标志:public void writePage(Page page)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public void writePage(Page page) throws IOException {
    // TODO: some code goes here
    // not necessary for lab1
    PageId pageId = page.getId();
    int pageNo = pageId.getPageNumber();
    int offset = pageNo * BufferPool.getPageSize();
    byte[] pageData = page.getPageData();

    RandomAccessFile file = new RandomAccessFile(f, "rw");
    file.seek(offset);
    file.write(pageData);
    file.close();

    // 清除脏页
    page.markDirty(false, null);
    }
  • 将tuple插入到HeapFile的page中,如果HeapFile的page都满了还要在HeapFile中创建一个新的page:public List<Page> insertTuple(TransactionId tid, Tuple t)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    public List<Page> insertTuple(TransactionId tid, Tuple t)
    throws DbException, IOException, TransactionAbortedException {
    // TODO: some code goes here
    // not necessary for lab1
    ArrayList<Page> pageList = new ArrayList<>();
    for (int i = 0; i < numPages(); i++) {
    HeapPage p = (HeapPage) Database.getBufferPool().getPage(tid, new HeapPageId(getId(), i),
    Permissions.READ_WRITE);
    if (p.getNumUnusedSlots() == 0) {
    // 当前页面没有空闲槽位
    continue;
    }
    p.insertTuple(t); // 调用HeapPage的insertTuple方法
    pageList.add(p);
    return pageList;
    }
    // 没有找到合适的页面,需要创建新的页面
    BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(f, true));
    byte[] emptyData = HeapPage.createEmptyPageData();
    bos.write(emptyData);
    bos.close();
    // 加载入BufferPool
    HeapPage p = (HeapPage) Database.getBufferPool().getPage(tid, new HeapPageId(getId(), numPages()-1),
    Permissions.READ_WRITE); // pgNo: numPages()方法的返回值是基于文件当前的大小动态计算的
    p.insertTuple(t);
    pageList.add(p);
    return pageList;
    }

    创建新page时将pageNo设置为numPages()-1而不是numPages()是因为numPages()方法的返回值是基于文件当前的大小动态计算的,默认新page的pageNo是最后一个。

    BufferedInputStream和BufferedOutputStream类就是实现缓冲功能的输入/输出流,使用带缓冲的输入/输出流,效率更高、速度更快。

    使用步骤:①创建FileOutputStream对象,构造方法中绑定要输出的目标文件; ②创建BufferOutputStream对象,使用write方法将数据写入内部缓冲区; ③使用BufferOutputStream的flush方法将缓冲区数据刷新到文件; ④释放资源。

    1
    2
    3
    BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream("bos.txt")) ;
    bos.write("hello".getBytes()); //写数据
    bos.close(); //释放资源
  • 将HeapFile中某一个page上的tuple从该page中删除:public List<Page> deleteTuple(TransactionId tid, Tuple t)

    1
    2
    3
    4
    5
    6
    7
    8
    public List<Page> deleteTuple(TransactionId tid, Tuple t) throws DbException,
    TransactionAbortedException {
    // TODO: some code goes here
    // not necessary for lab1
    HeapPage p = (HeapPage) Database.getBufferPool().getPage(tid, t.getRecordId().getPageId(), Permissions.READ_WRITE);
    p.deleteTuple(t);
    return Collections.singletonList(p);
    }

根据tuple的RecordID属性也可以通过getPageId找到tuple所在的page页。

可以自行对比一下lab1和lab2中方法实现的区别:

BufferPool类

insert和delete这里BufferPool涉及到页面置换策略,其实也就是LRU策略。所以这部分实现放到Exercise5中一起来做。

Exercise4

实现Insert类、Delete类,对exercise3的功能进行封装

实现Insert和Delete操作符,用于修改磁盘上的页数据,这些操作符返回受影响的tuples个数(受影响的行数)。

Insert类

参数:

  • 执行插入操作的事务id:private TransactionId tid;

  • 待插入的tuples的迭代器:private OpIterator[] children;

  • 待插入的表id:private int tableId;

  • fetchNext方法会返回一个标识插入了多少行的tuple结果,tupleDesc就是该tuple的属性行:private TupleDesc tupleDesc;

    1
    this.tupleDesc = new TupleDesc(new Type[]{Type.INT_TYPE}, new String[]{"insertNums"});
  • 插入受影响行数的tuple结果(最主要的作用是避免fetchNext方法无限制的向下取,保证只调用一次):private Tuple insertRes;

方法:

  • 构造方法:public Insert(TransactionId t, OpIterator child, int tableId)

  • 执行插入操作,返回包含插入受影响行数的tuple:protected Tuple fetchNext()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    protected Tuple fetchNext() throws TransactionAbortedException, DbException {
    // TODO: some code goes here
    // 保证只调用一次,多次调用返回null
    if (insertRes != null) {
    return null;
    }
    int insertNums = 0;
    while (children[0].hasNext()) {
    try {
    Database.getBufferPool().insertTuple(tid, tableId, children[0].next());
    insertNums++;
    } catch (IOException e) {
    System.out.println("Insert tuples into database failed!");
    throw new RuntimeException(e);
    }
    }
    insertRes = new Tuple(tupleDesc); // 计算插入操作影响的行数
    insertRes.setField(0, new IntField(insertNums));
    return insertRes;
    }

Delete类

封装类的实现和Insert类完全相同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void deleteTuple(Tuple t) throws DbException {
// TODO: some code goes here
int tid = t.getRecordId().getTupleNumber();
boolean flag = false;

for (int i = 0; i < tuples.length; i++) {
// if (t.equals(tuples[i])) {
if(tuples[i] != null && compareTuplesByFields(t, tuples[i])) {
if (!isSlotUsed(i)) {
throw new DbException("tuple slot is already empty");
}
markSlotUsed(i, false);
tuples[tid] = null; // delete
flag = true;
}
}
if (!flag) {
throw new DbException("the tuple is not on this page");
}
}

Exercise5

Eviction Policy有很多,可以自己选择,我选择的是最常见的LRU策略。

Lab1中在BufferPool满后会抛出异常,这里实现了BufferPool的页面置换策略,采取的是LRU的策略(详见LeetCode中的实现 https://leetcode.cn/problems/lru-cache/description/?envType=study-plan-v2\&envId=top-100-liked )。

BufferPool类

原先采用的是HashMap保存pageId和page的映射:private final Map<PageId, Page> bufferPools = new ConcurrentHashMap<>();,加入页面置换策略后改用自定义的LRUCache类(维护一个双向链表)来存储pageId和page的映射关系。

同时还涉及exercise3中BufferPool的插入、删除操作。

参数:

  • LRUCache类:private static class LRUCache

    为了实现LRU算法,需要维护一个双向链表LRUCache,用于记录每个PageId的访问顺序

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    /**
    * 为了实现LRU算法,需要维护一个双向链表,用于记录每个PageId的访问顺序
    */
    private static class LRUCache {
    int capacity, size;
    ConcurrentHashMap<PageId, Node> map;
    // 头节点和尾节点:标志位无数据
    Node head = new Node();
    Node tail = new Node();

    public LRUCache(int capacity) {
    this.capacity = capacity;
    this.size = 0;
    map = new ConcurrentHashMap<>();
    head.next = tail;
    tail.prev = head;
    }

    public synchronized Page get(PageId key) {
    if (map.containsKey(key)) {
    Node node = map.get(key);
    // 定位到链表头
    moveToHead(node);
    return node.val;
    } else {
    return null;
    }
    }

    public synchronized void put(PageId key, Page val) {
    if (map.containsKey(key)) {
    // 更新value
    Node node = map.get(key);
    node.val = val;
    moveToHead(node);
    } else {
    Node newNode = new Node(key, val);
    map.put(key, newNode);
    // 添加到链表头
    addToHead(newNode);
    size++;
    if (size > capacity) {
    // 移除链表尾
    Node node = removeTail();
    map.remove(node.key);
    size--;
    }
    }
    }

    // 添加到链表头部
    private synchronized void addToHead(Node node) {
    node.prev = head;
    node.next = head.next;
    head.next.prev = node;
    head.next = node;
    }

    // 移动到链表头部
    private synchronized void moveToHead(Node node) {
    // 先从原位置删除node
    removeNode(node);
    // 再将node插入链表头部
    addToHead(node);
    }

    // 删除node节点
    private void removeNode(Node node) {
    node.prev.next = node.next;
    node.next.prev = node.prev;
    }

    // 删除尾部节点
    private Node removeTail() {
    Node node = tail.prev;
    removeNode(node);
    return node;
    }

    private synchronized int getSize() {
    return size;
    }

    private static class Node {
    PageId key;
    Page val;
    Node prev;
    Node next;

    public Node() {}

    public Node(PageId key, Page val) {
    this.key = key;
    this.val = val;
    }
    }

    public Set<Map.Entry<PageId, Node>> getEntrySet() {
    return map.entrySet();
    }
    }

方法:

  • 调用HeapFile的insertTuple方法插入元组,将返回的结果保存到BufferPool:public void insertTuple(TransactionId tid, int tableId, Tuple t)

    1
    2
    3
    4
    5
    6
    7
    8
    public void insertTuple(TransactionId tid, int tableId, Tuple t)
    throws DbException, IOException, TransactionAbortedException {
    // TODO: some code goes here
    // not necessary for lab1
    DbFile f = Database.getCatalog().getDatabaseFile(tableId);
    List<Page> updatePages = f.insertTuple(tid, t);
    updateBufferPool(updatePages, tid);
    }
  • 调用HeapFile的deleteTuple方法删除元组,将结果保存到BufferPool:public void deleteTuple(TransactionId tid, Tuple t)

    1
    2
    3
    4
    5
    6
    7
    8
    public void deleteTuple(TransactionId tid, Tuple t)
    throws DbException, IOException, TransactionAbortedException {
    // TODO: some code goes here
    // not necessary for lab1
    DbFile f = Database.getCatalog().getDatabaseFile(t.getRecordId().getPageId().getTableId());
    List<Page> updatePages = f.deleteTuple(tid, t);
    updateBufferPool(updatePages, tid);
    }
  • 更新BufferPool的存储:public void updateBufferPool(List<Page> updatePages, TransactionId id)

    注意将页面设置为脏页,因为在BufferPool中修改page后,和磁盘中的page不一致了。

    1
    2
    3
    4
    5
    6
    7
    public void updateBufferPool(List<Page> updatePages, TransactionId id) {
    for (Page page: updatePages) {
    page.markDirty(true, id); // 设置为脏页(因为在BufferPool中修改page后,和磁盘中的page不一致了)
    // update BufferPool
    lruCache.put(page.getId(), page);
    }
    }
  • 从BufferPool中移除页面:public synchronized void removePage (PageId pid)

    1
    2
    3
    4
    5
    6
    7
    public synchronized void removePage(PageId pid) {
    // TODO: some code goes here
    // not necessary for lab1
    if (pid != null) {
    lruCache.map.remove(pid);
    }
    }
  • 将某个页面如果是脏页则刷新到磁盘:private synchronized void flushPage(PageId pid)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    private synchronized void flushPage(PageId pid) throws IOException {
    // TODO: some code goes here
    // not necessary for lab1
    Page page = lruCache.get(pid);
    if (page == null) {
    return;
    }
    TransactionId tid = page.isDirty();
    if (tid != null) {
    Page before = page.getBeforeImage();
    Database.getLogFile().logWrite(tid, before, page);
    Database.getCatalog().getDatabaseFile(pid.getTableId()).writePage(page);
    }
    }
  • 将所有脏页刷新到磁盘:public synchronized void flushAllPages()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public synchronized void flushAllPages() throws IOException {
    // TODO: some code goes here
    // not necessary for lab1
    for (Map.Entry<PageId, LRUCache.Node> group: lruCache.getEntrySet()) {
    Page page = group.getValue().val;
    if (page.isDirty() != null) {
    flushPage(group.getKey()); // 将不是脏页的页面刷新页面到磁盘
    }
    }
    }

要判断要删除的目标元组的字段是否和提供的相等,而不是地址相等: