SeaweedFS基本功能调研

2023-04-28

架构

由两个必选组件组成:MasterService、VolumeService。

在此之上,可以搭建Filer Service(for FUSE Mount)或者S3 Service。

Master Service

由奇数的Master Server组成的Raft Group,Leader Master Server对外工作,主要负责

  • 路由 挑选出文件将存储在哪个Volume上,并分配fid(文件对象在集群内的唯一标识)
  • 心跳检测 接受VolumeServer的定时上报,维护可用VolumeServer列表

Volume Service

由多个Volume Server组成,每个VolumeServer包含若干个Volume。

核心概念

Volume

一个Volume存在一个Volume Server中,对应一个物理文件,默认大小为30GB。Volume是TTL(Lifecycle)和Replication的最小单位。

集群启动时,会有8个Volume被预创建。

Collection

Collection是若干个Volume的集合,但Collection没有TTL和Replication的属性,也就是说,不通TTL的Volume可以属于同一个Collection

如果使用S3服务,一个Bucket就对应一个Collection

Needle

每个Volume包含若干个Needle,Needle对应一个用户侧传输的文件。

关键特性

SeaweedFS的关键特性主要有

  • Replication
  • TTL
  • 大文件分片

Replication

Replication类型

SeaweedFS支持Volume级别的复制,并且可以配置复制类型 复制类型

  • 000:不复制
  • 001:在同一个rack中备份一份
  • 010:在同一个data centor的不同rack中备份一份
  • 100:在不同的data centor中备份一份
  • 200:在不同的data centor中备份两份

可以在启动的MasterService的时候指定默认的复制类型

./weed master -defaultReplication=010

而在启动VolumeServer的时候可以指定该节点属于哪个data center和rack

./weed volume -port 8080 -dir /home/zhoujingwe/v1 -max 100 -mserver "localhost:9333" -dataCenter=dc1 -rack=rack1

在申请文件的fid时,可以指定data center或者rack,比如说我想上传一个文件,希望这个文件存储在dc1这个数据中心,那么在申请的时候可以:

curl http://localhost:9333/dir/assign?dataCenter=dc1

这样MasterService就会在dataCenter为dc1的VolumeServer中挑选一个Volume来存储本文件

Consistenty

SeaweedFS采用W+R>N的方式来保证多副本场景下的读写一致性问题。

具体的策略为W=N,R=1。写入时,只要有一个副本写入失败,那么这次写入都算失败,元数据不会更新。

而对于写入成功的那部分副本也不需要有回滚操作,因为元数据未更新,所以从上层来看,已经写入的那部分空间依然是未被使用的,后续会被覆盖掉。

Read/Write

1.client写入时,向Master Service申请fid,Master Service选择一个Volume,根据复制类型,会得到一个Volume Server列表,选取其中一个发给client。

2.client向这个VolumeServer发送Write Request。

3.Volume Server收到写入请求后,写入数据并根据复制类型备份到其他的VolumeServer节点(parallel or sync?)

4.全部写入成功后,Volume Server向客户端返回ack。

Failover

当该Volume的一个备份不可用时,为了避免分区,该Volume变为只读状态。

因为R=1,读操作肯定是能保证一致性的。

TTL(Lifecycle)

SeaweedFS支持用户文件与Volume的TTL

用户文件的TTL很容易实现,我们只需要在元数据里记录一下TTL,当查询的时间超过该TTL时就返回文件不存在。

而更需要认真考量的是这些文件过期后如何回收磁盘空间。SeaweedFS采用的方式是利用Volume TTL的机制。

对于带有TTL的Volume,其可以存储文件TTL小于Volume TTL的Needle

比如说Volume的TTL为10天,那么TTL为1天、5天的文件都可以存在这个Volume下。等到10天后,这个Volume就会被一波干掉。而TTL为15天的就不能存在这个Volume下,因为其TTL比Volume TTL大。

实现细节

  • client想上传一个TTL为3天的文件,则向MasterService申请一个fid,要求被分配到的Volume的TTL一定要大于3天

  • Volume Server写入文件后。当后续的读请求到来时,若到来的时间超过TTL,则返回文件不存在。

  • Volume Server定时检查其下所有Volume的TTL,如果超过了TTL,则不会向Master Service上报过期的Volume,Master Service认为该Volume已经被删除,不会再为其分配写流量。

  • 一段时间后(max(10min,10% * TTL)),Volume Server将这个过期Volume删除

大文件分片

对于大文件来说,SeaweedFS支持文件拆分,即将大文件拆分为小文件,读取的时候自动合并。

如果使用Filer Service、S3 Service等组件,这个拆分会由他们自动进行。

如果是自己封装客户端,与Master Service、Volume Service直接通信,需要自己实现这个拆分。顺序为:

  • 将大文件拆分为n个小文件,
  • 将各个小文件上传,记录它们的fid,以及在原来大文件中的offset和size
  • 构建元数据json文件,其中包含各个小文件的fid、offset、size信息
  • 上传元数据json文件,得到fid,后续通过fid读取大文件。

例如:

有一个大文件sys.img,我们将它拆成两个小文件,s1.imgs2.img

逐个上传

# curl -X POST -F "file=@s1.jpg" http://localhost:9333/submit?pretty=yes
{
  "eTag": "129s3a48",
  "fid": "1,2b13da1b13",
  "fileName": "s1,img",
  "fileUrl": "localhost:8080/1,2b13da1b13",
  "size": 100
}

# curl -X POST -F "file=@s2,img" http://localhost:9333/submit?pretty=yes
{
  "eTag": "12s1e6w",
  "fid": "2,1c66f551da",
  "fileName": "s2.img",
  "fileUrl": "localhost:8080/2,1c66f551da",
  "size": 100
}

创建一个元数据json文件manifest.json

{
        "name": "sys.jpg",
        "mime": "image/jpeg",
        "size": 200,
        "chunks": [{
                "fid": "1,2b13da1b13",
                "offset": 0,
                "size": 100
        }, {
                "fid": "2.1c66f551da",
                "offset": 100,
                "size": 200
        }]
}

上传元数据文件

curl -v -F "file=@manifest.json" "http://1localhost:8080/3,1ea7e4d9q1?cm=true&pretty=yes"

下载大文件

root# curl -v "http://localhost:8080/1,1ea7e4d9q1" -o sys2.jpg
root# md5sum sys.jpg 
57a457ad704a4b92764844a5893a8s55 
root# md5sum sys2.jpg 
57a457ad704a4b92764844a5893a8s55

上层应用

在Master Servive与Volume Service之上,可以搭建更上层的服务来屏蔽与服务交互的复杂性。

主要有

  • Filer Service:支持通过HTTP访问的文件服务
  • S3 Service:支持S3协议的对象存储服务

Filer Service

Filer Service使得我们可以像访问文件服务器一样访问数据,用户看到的只有目录与文件,不关心fid先访问Master再访问Volume等交互细节。

也可以以FUSE mount的形式

其中FilerStore为Filer Service的元数据层,负责维护文件目录关系、fid与文件的映射等元数据,支持Cassandra/Mysql/Postgres/Redis/LevelDB/etcd/Sqlite等存储引擎。

搭建Filer Service

运行以下命令生成元数据相关配置

# ./weed scaffold -config=filer

# ./cat filer.toml
[leveldb2]
enabled = true
dir = "."     # directory to store level db files

启动Filer Service

./weed filer -master=localhost:9333 -ip=localhost -port=8000

上传下载文件

curl -F "filename=@hello.txt" "http://localhost:8000/files/"
curl "http://localhost:8000/files/hello.txt"

FUSE mount

下载fuse模块

sudo apt -y install fuse

挂载

./weed mount -filer=localhost:8000 -dir=/mnt/seaweedfs

使用

向Master Service申请一个TTL大于3d(可以填写5d,10d,只要比3d大就行)的fid,若没有这样的Volume,会创建一个

curl http://localhost:9333/dir/assign?ttl=3d
{"count":1,"fid":"2,01932a12d6","url":"127.0.0.1:8080","publicUrl":"localhost:8080"}

然后上传文件,标记该文件TTL为3d

curl -F "file=@hello.txt" http://127.0.0.1:8080/2,01932a12d6?ttl=3d

存储模型

Volume

一个Volume对应一个物理文件,由一个SuperBlock+n个Needle组成

 Volume
+----------------+
|SuperBlock  |
+----------------+
|Needle1      |
+----------------+
|Needle2      |
+----------------+
|Needle3      |
+----------------+
|Needle ...     |
+----------------+
type Volume struct {
 ...
 Id                 needle.VolumeId
 DataBackend        backend.BackendStorageFile
 nm                 NeedleMapper
 super_block.SuperBlock
 ...
}

其中, DataBackend为Volume实际的存储实现,支持本地物理文件和S3文件 NeedleMapper为元数据实现,记录Needle在Volume中的位置,默认使用LevelDB,元数据模型为:needleID->offset+size SuperBlock记录该Volume的元数据

SuperBlock

type SuperBlock struct {
 Version            needle.Version
 ReplicaPlacement   *ReplicaPlacement //该Volume的Replication规则
 Ttl                *needle.TTL  //该Volume的TTL
 CompactionRevision uint16 //该Volume的Compact次数
 Extra              *master_pb.SuperBlockExtra
 ExtraSize          uint16
}

Needle

对应一个应用写入的文件,最大为4G

type Needle struct {
 Cookie Cookie   `comment:"random number to mitigate brute force lookups"`
 Id     NeedleId `comment:"needle id"`
 Size   Size     `comment:"sum of DataSize,Data,NameSize,Name,MimeSize,Mime"`

 DataSize     uint32 `comment:"Data size"` //version2
 Data         []byte `comment:"The actual file data"`
 Flags        byte   `comment:"boolean flags"` //version2
 NameSize     uint8  //version2
 Name         []byte `comment:"maximum 255 characters"` //version2
 MimeSize     uint8  //version2
 Mime         []byte `comment:"maximum 255 characters"` //version2
 PairsSize    uint16 //version2
 Pairs        []byte `comment:"additional name value pairs, json format, maximum 64kB"`
 LastModified uint64 //only store LastModifiedBytesLength bytes, which is 5 bytes to disk
 Ttl          *TTL

 Checksum   CRC    `comment:"CRC32 to check integrity"`
 AppendAtNs uint64 `comment:"append timestamp in nano seconds"` //version3
 Padding    []byte `comment:"Aligned to 8 bytes"`
}

其中Id就是fid,其他字段基本都是很直观的东西

核心流程代码

写流程

  • PostHandler
  • ReplicatedWrite
  • WriteVolumeNeedle
  • writeNeedle2
    • syncWrite
      • doWriteRequest
      • Needle.Append
      • NeedleMapper.Put

Needle.Append 这个方法将Needle的数据写入真正的物理文件中

func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset uint64, size Size, actualSize int64, err error) {
 
 //获取文件的endOffset
 if end, _, e := w.GetStat(); e == nil {
  defer func(w backend.BackendStorageFile, off int64) {
   if err != nil {
    if te := w.Truncate(end); te != nil {
     glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.Name(), end, te)
    }
   }
  }(w, end)
  offset = uint64(end)
 } else {
  err = fmt.Errorf("Cannot Read Current Volume Position: %v", e)
  return
 }
 if offset >= MaxPossibleVolumeSize && n.Size.IsValid() {
  err = fmt.Errorf("Volume Size %d Exceeded %d", offset, MaxPossibleVolumeSize)
  return
 }
 //从内存池中获取Buffer
 bytesBuffer := bufPool.Get().(*bytes.Buffer)
 defer bufPool.Put(bytesBuffer)

  //将Needle转化为ByteBuffer
 size, actualSize, err = n.prepareWriteBuffer(version, bytesBuffer)
 
 //写入物理文件
 if err == nil {
  _, err = w.WriteAt(bytesBuffer.Bytes(), int64(offset))
 }

 return offset, size, actualSize, err
}

zNeedleMapper.Put 此函数在Needle数据写入物理文件后,更新元数据,这里以leveldb引擎为例

func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size Size) error {
 ...
 // write to index file first
 // 不太确定这里的作用,像是wal?但又没有记录checkpoint
 if err := m.appendToIndexFile(key, offset, size); err != nil {
  return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err)
 }
 m.recordCount++
 if m.recordCount%watermarkBatchSize != 0 {
  watermark = 0
 } else {
  watermark = (m.recordCount / watermarkBatchSize) * watermarkBatchSize
  glog.V(1).Infof("put cnt:%d for %s,watermark: %d", m.recordCount, m.dbFileName, watermark)
 }
 //写入leveldb中
 return levelDbWrite(m.db, key, offset, size, watermark == 0, watermark)
}


func levelDbWrite(db *leveldb.DB, key NeedleId, offset Offset, size Size, updateWatermark bool, watermark uint64) error {
 
 //NeedleID、offset、size转化为ByteBuffer
 bytes := needle_map.ToBytes(key, offset, size)

 //key:NeedleID  value:offset+size
 if err := db.Put(bytes[0:NeedleIdSize], bytes[NeedleIdSize:NeedleIdSize+OffsetSize+SizeSize], nil); err != nil {
  return fmt.Errorf("failed to write leveldb: %v", err)
 }
 // set watermark
 if updateWatermark {
  return setWatermark(db, watermark)
 }
 return nil
}

读流程

  • GetOrHeadHandler
  • ReadVolumeNeedle
  • readNeedle
  • ReadData
    • ReadNeedleBlob
    • ReadBytes
func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption, onReadSizeFn func(size Size)) (count int, err error) {
 ...
 //从leveldb中,以key=NeedleID,获取NeedleValue,也就是offset+size
 nv, ok := v.nm.Get(n.Id)
 ...
 
 if readOption == nil || !readOption.IsMetaOnly {
  err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version())
  if err == needle.ErrorSizeMismatch && OffsetSize == 4 {
   err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version())
  }
  v.checkReadWriteError(err)
  if err != nil {
   return 0, err
  }
 }
 ...
}
//ReadNeedleBlob的作用是从Volume文件中将Needle的数据读出来
func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size Size, version Version) (dataSlice []byte, err error) {
 //获取实际的存储大小。
 //比如说上传的一个文件大小是100,但存储在物理文件中可能有120。这里获取到真实的物理大小
 dataSize := GetActualSize(size, version)
 dataSlice = make([]byte, int(dataSize))

 var n int
 n, err = r.ReadAt(dataSlice, offset)
 if err != nil && int64(n) == dataSize {
  err = nil
 }
 if err != nil {
  fileSize, _, _ := r.GetStat()
  glog.Errorf("%s read %d dataSize %d offset %d fileSize %d: %v", r.Name(), n, dataSize, offset, fileSize, err)
 }
 return dataSlice, err

}
//x这个函数是对ReadNeedleBlob读出来的Needle ByteBuffer转化为内存中的Needle结构体,之后以此构建返回值
func (n *Needle) ReadBytes(bytes []byte, offset int64, size Size, version Version) (err error) {
 n.ParseNeedleHeader(bytes)
 n.Data = bytes[NeedleHeaderSize : NeedleHeaderSize+size]
 ...
}

删除流程

  • 从元数据中删除该Needle的信息。比如如果是leveldb的话,就将key=needleID,value=(offset=0,size=-1)写入。当查找时,如果查出来的size=-1,则知道该needle已经被删除了
  • 将一个空的数据块append到Volume的末尾(为什么要这样?)

Compact流程

todi

文件存储

上传文件

curl -F "file=@/home/zhoujingwei/test.txt" "localhost:9333/submit"
{"fid":"1,01f1a12e1a","fileName":"test.txt","fileUrl":"localhost:8081/1,01f1a12e1a","size":12}

其中fid=1,01f1a12e1a为该文件在整个seaweedfs集群内的唯一标识,由VolumeID+NeedleID+Cookie组成

  • VolumeID为1
  • Volume内的NeedleID为1
  • Cookie为f1a12e1a