计数器
内核维护了各种统计数据,成为计数器,用于对事件计数。通常计数器为无符号的整数型,发生时间时递增。例如,有网络包接收的计数器,有磁盘IO发生额计数器,也有系统调用执行的计数器。
系统级别
vmsat 虚拟内存和物理内存的统计
mpstat
iostat
netstat
进程级别
ps
top
pmap
内核维护了各种统计数据,成为计数器,用于对事件计数。通常计数器为无符号的整数型,发生时间时递增。例如,有网络包接收的计数器,有磁盘IO发生额计数器,也有系统调用执行的计数器。
vmsat 虚拟内存和物理内存的统计
mpstat
iostat
netstat
ps
top
pmap
我的环境是:
Ubuntu 14.04 (Debian)
如果是RPM的话,安装方法不同,需要参考如下。
![ceph](http://docs.ceph.com/docs/master/install/install-ceph-gateway/)
由于Apache的版本不同,配置FASTCGI的时候是不一样的,所以需要注意一下。在某些版本安装Apache2.4(如 RHEL 7, CentOS 7, 或者Ubuntu 14.04 Trusty) 这些版本的Linux系统,mod_proxy_fcgi是已经有的。所以当安装了Apache或者httpd(RPM-based), mod_proxy_fcgi 就已经可以使用的了。而在Apache2.2版本的中(如 RHEL 6, CentOS 6,Ubuntu 12.04 Precise) mod_proxy_fcgi 是另外的安装包,在 RHEL6/CentOS6, 在EPEL6 版本中 可以使用 yum install mod_proxy_fcgi. Ubuntu 12.04, 似乎又问题。
sudo apt-get install apache
Apache 的配置文件在/etc/apache2/apache2.conf, 需要添加ServerName
即主机的全称域名FQDN.(eg, hostname -f).
加mod_proxy_fcgi模块
sudo a2enmod proxy_fcgi
然后启动Apache
sudo service apache2 start
有些 REST 客户端默认使用HTTPS. 所以需要考虑为Apache启动SSL。
安装相关依赖:
sudo apt-get install openssl ssl-cert
启动SSL模块:
sudo a2enmod ssl
生成证书:
sudo mkdir /etc/apache2/ssl
sudo openssl req -x509 -nodes -days 365 -newkey rsa:2048 -keyout /etc/apache2/ssl/apache.key -out /etc/apache2/ssl/apache.crt
重启Apache:
sudo service apache2 restart
安装 Radosgw:
sudo apt-get install radosgw
要配置Ceph对象网关要求要安装好Ceph集群和带有FastCGI模块Apache。
Ceph对象网关相当于Ceph集群的客户端。作为Ceph的客户端需要如下:
1. 一个名字作为网关实例。 (我用gateway)
2. 一个在keyring中拥有一定权限的存储集群的用户名。
3. Ceph集群中有存储数据的资源池 Pool
4. 网关实例的数据目录
5. Ceph 配置文件中有实例的相关信息
6. 一个用于Apache和FastCGI交互的配置文件
每一个实例必须要有一个用户名和key才可以访问Ceph集群。
在下来的步骤中,我们用管理节点来创建key。
* 首先是创建一个客户端用户名和key。
* 然后将key加入到集群中。
* 最后我们将key ring 放到网关实例所在的节点。
如果是使用docker部署ceph的话,需要在下面使用ceph-authool, ceph auth,
ceph 前面加上docker exec mon ceph
sudo ceph-authtool --create-keyring /etc/ceph/ceph.client.radosgw.keyring
sudo chmod +r /etc/ceph/ceph.client.radosgw.keyring
需要为keyring添加可读权限,启动radosgw时,可能会报错。
所以client.radosgw跟就是gateway.
sudo ceph-authtool /etc/ceph/ceph.client.radosgw.keyring -n client.radosgw.gateway --gen-key
sudo ceph-authtool -n client.radosgw.gateway --cap osd 'allow rwx' --cap mon 'allow rwx' /etc/ceph/ceph.client.radosgw.keyring
需要将key加入到Ceph集群中。
sudo ceph -k /etc/ceph/ceph.client.admin.keyring auth add client.radosgw.gateway -i /etc/ceph/ceph.client.radosgw.keyring
如果在同一个主机上就不用了。
ceph auth list
ceph osd pool create {pool_name} {pg_num} {pgp_num}
查看是否创建成功。
ceph osd lspools
在管理节点为Ceph的配置文件添加对象网关的配置。对象网关的配置要求你区别对象网关的实例。在安装对象网关deamon时需要指定主机名,keyring, 为FastCGI的socket路径,日志文件。不同Apache版本配置时参数是不一样的。我的环境是Ubuntu 14.04, Apache 2.4.7. 在Apache2.2和Apache2.4的早期版本(RHEL 6, Ubuntu 12.04, 14.04), 在/etc/ceph/ceph.conf添加:
[client.radosgw.gateway]
host = {hostname}
keyring = /etc/ceph/ceph.client.radosgw.keyring
rgw socket path = ""
log file = /var/log/radosgw/client.radosgw.gateway.log
rgw frontends = fastcgi socket_port=9000 socket_host=0.0.0.0
rgw print continue = false
注: Apache2.2和Apache2.4的早期版本不是使用Unix Domain Sockets而是localhost TCP。
下面是针对于其他版本下的,主要的区别就是支持UDS, 所以配置文件不同。
在Apache2.4.9后的版本(RHEL 7, CentOS7),在/etc/ceph/ceph.conf添加:
[client.radosgw.gateway]
host = {hostname}
keyring = /etc/ceph/ceph.client.radosgw.keyring
rgw socket path = /var/run/ceph/ceph.radosgw.gateway.fastcgi.sock
log file = /var/log/radosgw/client.radosgw.gateway.log
rgw print continue = false
注:
Apache2.4.9支持Unix Domain Socket(UDS), 但是Ubuntu 14.04 安装的是Apache2.4.7.
所以不支持UDS, 而是配置为localhost TCP.
{hostname} 是提供网关服务的短的主机域名(hostname -s 的输出)。
由于Ceph中默认的日志级别不高,所以可以在ceph.conf中添加:
[global]
debug ms = 1
debug rgw = 20
但是这样日志量会有点大, 如果是docker部署的话,docker的日志现在没有支持清除。
配置文件默认不用创建网关的数据目录,所以需要手动创建。
sudo mkdir -p /var/lib/ceph/radosgw/ceph-radosgw.gateway
sudo /etc/init.d/radosgw start
启动失败可以,在/var/log/radosgw/client.radosgw.gateaway.log查看日志。
Apache版本不同和Linux系统不同,配置文件不同,要注意区别。
Ubuntu是在/etc/apache2/conf-available, 为对象网关创建rgw.conf.
* 在Apache2.2和Apache2.4的早期版本(RHEL 6, Ubuntu 12.04, 14.04), 我的环境是Ubuntu 14.04,
所以添加:
<VirtualHost *:80>
ServerName localhost
DocumentRoot /var/www/html
ErrorLog /var/log/httpd/rgw_error.log
CustomLog /var/log/httpd/rgw_access.log combined
RewriteEngine On
RewriteRule .* - [E=HTTP_AUTHORIZATION:%{HTTP:Authorization},L]
SetEnv proxy-nokeepalive 1
ProxyPass / fcgi://localhost:9000/
</VirtualHost>
下面是针对于其他版本下的,主要的区别就是支持UDS, 所以配置文件不同。
在Apache2.4.9后的版本(RHEL 7, CentOS7)添加:
<VirtualHost *:80>
ServerName localhost
DocumentRoot /var/www/html
ErrorLog /var/log/httpd/rgw_error.log
CustomLog /var/log/httpd/rgw_access.log combined
RewriteEngine On
RewriteRule .* - [E=HTTP_AUTHORIZATION:%{HTTP:Authorization},L]
SetEnv proxy-nokeepalive 1
ProxyPass / unix:///var/run/ceph/ceph.radosgw.gateway.fastcgi.sock|fcgi://localhost:9000/
</VirtualHost>
重启Apache:
*sudo service apache2 restart *
在网关主机上执行如下命令:
sudo radosgw-admin user create --uid="testuser" --display-name="First User"
输出的结果如下:
{"user_id": "testuser",
"display_name": "First User",
"email": "",
"suspended": 0,
"max_buckets": 1000,
"auid": 0,
"subusers": [],
"keys": [
{ "user": "testuser",
"access_key": "I0PJDPCIYZ665MW88W9R",
"secret_key": "dxaXZ8U90SXydYzyS5ivamEP20hkLSUViiaR+ZDA"}],
"swift_keys": [],
"caps": [],
"op_mask": "read, write, delete",
"default_placement": "",
"placement_tags": [],
"bucket_quota": { "enabled": false,
"max_size_kb": -1,
"max_objects": -1},
"user_quota": { "enabled": false,
"max_size_kb": -1,
"max_objects": -1},
"temp_url_keys": []}
其中access_key和secret_key需要用来访问S3.
要用S3访问网关,需要提供aws_access_key_id 和aws_secret_access_key和网关的hostname。
新建s3test.py, 加入:
import boto
import boto.s3.connection
access_key = 'I0PJDPCIYZ665MW88W9R'
secret_key = 'dxaXZ8U90SXydYzyS5ivamEP20hkLSUViiaR+ZDA'
conn = boto.connect_s3(
aws_access_key_id = access_key,
aws_secret_access_key = secret_key,
host = '{hostname}',
is_secure=False,
calling_format = boto.s3.connection.OrdinaryCallingFormat(),
)
bucket = conn.create_bucket('my-new-bucket')
for bucket in conn.get_all_buckets():
print "{name}\t{created}".format(
name = bucket.name,
created = bucket.creation_date,
)
运行python 脚本:
python s3test.py
输出结果:
my-new-bucket ......
如果出现这种的错误,一种可能的情况就是Ceph集群没有启动,或者集群的状态
不是HEALTH_OK, 所以启动时候失败。
还有就是由于配置文件没有在 /etc/ceph/ 目录下,要先确认/etc/ceph/目录下有
ceph.conf 和keyring文件, 而且都要有可读的权限。
这种情况,通过查看详细的日志,可以发现是由于认证失败. 我们上面在Ceph的管理
节点上创建了keyring和用它创建gateway 用户和gateway的key.我们要确认Ceph集群
真的有加入认证列表中. 用如下命令查看:
*sudo ceph auth list *
结果应该要有 client.radsogw.gateway
这个可能的一个原因就是漏掉禁用默认site的操作。
解决的方法就是执行:
** sudo a2dissite 000-default **
另外一个可能就是mod_proxy_fcgi没有启动,或者没有配置错误。
没有启动,只要启动就可以了:
sudo a2enmod proxy_fcgi
配置错误就是根据Linux的版本和Apache的配置的不同,修改rgw.conf和网关的配置文件。
可能的原因是S3生成的key中有转义符号'/',然后验证的解析错误,所以验证失败。
解决的方法就是手动处理或者重新生成key, 手动处理就是将转义符号去掉就可以了。
可能原因的是端口错误,可以先用curl或者浏览器查看一下,
Apache和mod_proxy_fcgi是不是都是正常,curl {gateway hostname},
如果是如下的情况, 说明正常。
如果不是可能是配置rgw.conf, 端口不是80, 会出现这种情况。
我的环境是: ubuntu LTS 14.04
安装docker在ubuntu下,可以直接使用apt-get,但是默认安装的不是最新版。 安装docker最新版的如下:
curl -sSL https://get.daocloud.io/docker | sh
由于在国内网络原因,如果直接从docker Hub下载镜像非常慢,而且容易中间出错。 所以用国内的docker加速器,我用的是daocloud。daocloud注册一个账号,然后 再用docker安装daocloud的加速器,这样每次下载镜像的时候,直接使用dao pull 就可以了,速度快很多。
安装docker的加速器:
curl -sSL https://get.daocloud.io/daomonit/install.sh | sh -s dec4a3298f2e8da43169f09f18cd5c26ee04b165
安装非常简单,只需要使用 dao pull ceph/mon 利用 docker images 查看镜像是否下载好了。 sudo docker images
构造monitor容器并运行
sudo docker run -tid --name=mon --net=host
-e MON_NAME=mymon -e MON_IP=192.168.0.136
-e CEPH_PUBLIC_NETWORK=192.168.0.0/24
-v /etc/ceph:/etc/ceph ceph/mon
sudo docker run -tid --name=mon --net=host -e MON_NAME=mon3 -e MON_IP=192.168.56.3 -e CEPH_PUBLIC_NETWORK=192.168.56.0/24 -v /etc/ceph:/etc/ceph ceph/mon
--name 为在docker镜像cpeh/mon设置一个名字,方便使用 --net ceph/mon 设置网络,为host, none, bridge等 -e 设置环境变量相关的参数 -v 设置挂载卷
查看当前正在运行的容器
配置文件在/etc/ceph/ceph.conf
添加
osd pool default size = 1 这个设置的是备份数
osd pool default mini size = x 如何上一行为1,下面就可以不用设置了。
osd pool default pg num = 32 这个是设置每个资源池pool的放置组数目
osd pool default pgg num = 32
osd crush chooseleaf num = 0 设置的是集群节点(单集群设置为0)
注意:
如果是单集群,osd crush chooseleaf num 要设置为0
osd pool default size 为备份数,跟集群的存储节点有关,要求存储节点数目
不小于备份数。
存储节点(storage node): 指的是集群中osd所在的host数目,也就是说与osd
的数量没有关系,而是与osd的host有关系。如果多个osd都是在同一个host下,
那么是属于同一个storage node的。可以用 docker exec mon ceph osd tree
查看。
和MONITOR类似,使用 dao pull ceph/osd
在构建osd前需要现在集群上创建osd,这样monitor才能将osd加入到集群中。 sudo docker exec mon ceph osd create
构造osd容器并运行
sudo docker run -tid --name=osd0 --net=host
-e CLUSTER=ceph -e WEIGHT=1.0
-e MON_NAME=mymon -e MON_IP=192.168.0.136
-v /etc/ceph:/etc/ceph
-v /opt/osd/0:/var/lib/ceph/osd/ceph-0 ceph/osd
sudo docker run -tid --name=osd0 --net=host -e CLUSTER=ceph -e WEIGHT=1.0 -e MON_NAME=mon1 -e MON_IP=192.168.56.1 -v /etc/ceph:/etc/ceph -v /opt/osd/0:/var/lib/ceph/osd/ceph-0 ceph/osd 这里我们osd容器的net为host,就是为localhost,为一个存储节点。 -e 设置的是osd的环境变量先关的。监控的IP等信息。 -v /opt/sod/0 是osd挂载的地方。 如果需要增加osd容器,则需要修改上面的参数。将0改为其他数字。
使用docker查看容器的日志
sudo docker logs -f container
如查看ceph的日志:
sudo docker logs -f mon
查看Ceph集群的状态
查看时集群的要是HEALTH_OK, 才可以。 由于集群刚开始创建是会默认创建一个rbd pool,而这个pool是默认为三个副本。 所以如果集群中没有三个存储节点,那么集群的状态就p一直是HEALTH_WARN. 可以用docker exec mon ceph osd dump查看osd中每个pool的情况。
上述情况的解决方法又两种,一种就是构造多两个存储节点(不是指osd,host要不同). 或者直接将这个rbd pool 删除掉,然后再创建新的pool。由于我们上的配置文件, 已经修改为每个pool默认为一个副本,所以就正常了。
创建或者删除pool
sudo docker exec mon ceph osd create pool_name pg_num
sudo docker exec mon ceph osd delete pool_namm pool_namm --i-really-want-to-do-it
从云存储的构建方面,云存储是在云计算(cloud computing)概念上延伸和发展 出来的一个新的概念,是一种新兴的网络存储技术,是指通过集群应用、网络技术 或分布式文件系统等功能,将网络中大量各种不同类型的存储设备通过应用软件集 合起来协同工作,共同对外提供数据存储和业务访问功能的一个系统。与传统的存 储设备相比,云存储不仅仅是一个硬件,而是一个网络设备、存储设备、服务器、 应用软件、公用访问接口、接入网、和客户端程序等多个部分组成的复杂系统,并 且通过应用软件来对外提供数据存储和业务访问服务。
从面向用户的服务方面,云存储不仅仅是存储,而是提供一种服务。就如同云 状的广域网和互联网一样,云存储对用户来讲,不是指某一个具体的设备,而是指 一个由许许多多个存储设备和服务器所构成的集合体。用户使用云存储,并不是使 用某一个存储设备,而是使用整个云存储系统带来的一种数据访问服务。用户可以 通过网络连接云端存储资源,实现用户的数据在云端随时随地的存储。
云存储的结构模型分为四层,自上而下为: 用户访问层,应用接口层,基础管理 层,存储层。
用户访问层 通过用户访问层,任何一个授权用户都可以使用应用接口层提供的公用应用接口来登 录云存储平台,享受云存储服务。
应用接口层 应用接口层是云存储最灵活多变的部分。不同的云存储运营单位可以根据实际业务类 型,开发不同的应用服务接口,提供不同的应用服务。
基础管理层 基础管理层是云存储最核心的部分,也是云存储中最难以实现的部分。基础管理层通 过集群、分布式文件系统和网格计算等技术,实现云存储中多个存储设备之间的协同 工作,使多个的存储设备可以对外提供同一种服务,并提供更大更强更好的数据访问 性能。
存储层 存储层是云存储最基础的部分。存储设备可以是FC光纤通道存储设备,可以是NAS和 iSCSI等IP存储设备,也可以是 SCSI或SAS等 DAS存储设备。云存储中的存储设备往 往数量庞大且分布多不同地域,彼此之间通过广域网、互联网或者 FC光纤通道网络连 接在一起。存储设备之上是一个统一存储设备管理系统,可以实现存储设备的逻辑虚 拟化管理、多链路冗余管理,以及硬件设备的状态监控和故障维护。
海量存储,弹性扩展 云存储具备海量存储的特点,同时拥有很好的可扩展性能,因此支持并可根据需要提供 线性扩展至 PB 级存储服务。
低成本 提供按需服务,户可以花更大的成本和更多的费用享受更多的资源和服务。
安全性 通过采用保护数据安全的策略,如采取可擦除代码(Erasure Code,EC)、安全套接 层(Secure Sockets Layer,SSL),访问控制列表(AccessControl List,ACL)等多重 保护策略和技术,保证数据的安全性。
可靠性,可用性 数据在云存储中是分布存放的,同时也采用相关的备份技术和算法,从而保证了数 据可靠性、数据可恢复性和系统弹性可扩展性等特点,同时确保硬件损坏、数据丢失等 不可预知的条件下的数据可用性和完整性,并且服务不中断。
Ceph的消息处理是采用发布订阅的设计模式,其中Messenger作为消息的发布者, Dispatcher和其子类为消息的订阅者。
Messenger 为消息的发布者,根据处理的网络包的处理不同其接口为不同的子类 实现。Messenger的功能就是作为消息的发布者,从Pipe读取到消息后,将信息 转发给Dispatcher和其子类处理。
Simple Messenger为Messenger的子类,其接口的实现。Simple Messenger处理 网络包的方式为对于为每个请求新建线程处理,为经典的生产者消费者问题。
Dispatcher为订阅者的基类,不同的子类实现功能不同。Dispatcher在初始化时 注册到发布者Messenger上,接受到Messenger的消息后再进行处理。
Pipe主要是消息的读取和发送,主要有两个组件Reader和Writer,分别用来处理 socket上的读取和发送,不仅是消息还有ACK和其他。Reader和Writer都是Thread 的子类,也就是说每次处理消息的的时候也会新建者两个线程。消息经过Reader 处理后,Messenger会通知已注册的Dispatcher处理,处理完后将回复的消息放到 Messenger::Pipe::out_q队列中,然后Writer再处理发送。
Accepter监听请求,监听到请求后,Messenger新建Pipe处理。
DispatchQueue包含了需要处理的消息,通过消息的优先级组织好,再通过DisapatchThread分发给Dispatcher处理。DispatchThread启动后,进入 等待状态,当有消息被放入队列时被激活。
Messenger 初始化时,由于Messenger是抽象类,不能直接实例化。通过调用create
方法实例化一个Simple Messenger.
// 调用create方法实例化一个Simple Messengern
oMessenger *msgr = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::MON(rank), "mon", 0);
// Messenger: bind() 绑定ip地址
// Simple Messenger: bind()
// Accepter: bind()
// create socket and start listening
err = msgr->bind(ipaddr);
Dispatcher的初始化,以Mon为例来解析。Mon初始化时,向Messenger注册, 然后
Messenger准备好。 在Messenger:ready()中DispatchQueue 启动线程,开始接收
需要缓存的消息。Accepter 启动线程,开始监听新的请求。
// start monitor
mon = new Monitor(g_ceph_context, g_conf->name.get_id(), store,
msgr, &monmap);
// Monitor: init()
// Messenger: add_dispatcher_tail()
// Messenger: ready()
// DispatchQueue: start() create Dispathce Thread
// Accepter: start() create thread to handle entry()
// Accepter: entry() socket accept message and messenger
// add a accept pipe to handle it
// Meessenger: add_accept_pipe(sd) start reader thread
// Pipe: start_reader()
// Pipe: reader()
mon->init();
Dispathcer(Mon)和Messenger初始化后, Messenger在ready()启动DispatchQueue 线程和Accepter线程。DispatchQueue: in_q 开始缓存需要处理的消息,然后交给 Dispatcher处理,Dispatcher处理后再将回复的消息放到 out_q。 Accepter启动线 程,开始在entry()接收请求和接收到消息后Messenger创建一个Pipe和启动Pipe: start_reader()读取消息,将消息放入DispatchQueue: in_q。
void SimpleMessenger::ready()
{
// 启动DispatchQueue线程
dispatch_queue.start();
// 启动Accepter线程
lock.Lock();
if (did_bind)
accepter.start();
lock.Unlock();
}
Accepter启动线程,线程都是由Thread为基类,Thread:create()调用phtread_
create新建线程,start_routine函数为entry().
int Accepter::start()
{
// start thread, handle entry()
create();
return 0;
}
Accepter::entry() 为Accepter线程的start_rountine,开始接收请求并交给
Simple Messenger处理。
void *Accepter::entry()
{
struct pollfd pfd;
pfd.fd = listen_sd;
pfd.events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
while (!done) {
int r = poll(&pfd, 1, -1);
if (done) break;
// accept 接收连接请求
entity_addr_t addr;
socklen_t slen = sizeof(addr.ss_addr());
int sd = ::accept(listen_sd, (sockaddr*)&addr.ss_addr(), &slen);
if (sd >= 0) {
// Simple Messenger开始处理这个连接
msgr->add_accept_pipe(sd);
}
}
return 0;
}
Messenger:add_accept_pipe(sd) 新建一个Pipe然后调用start_reader()处理这 个连接,start_reader启动Reader线程调用Pipe:reader()读取信息。
Pipe *SimpleMessenger::add_accept_pipe(int sd)
{
lock.Lock();
Pipe *p = new Pipe(this, Pipe::STATE_ACCEPTING, NULL);
p->sd = sd;
p->pipe_lock.Lock();
// 调用start_reader开始处理
p->start_reader();
p->pipe_lock.Unlock();
pipes.insert(p);
accepting_pipes.insert(p);
lock.Unlock();
return p;
}
void Pipe::reader()
{
// Pipe:accept(),调用start_writer()启动Writer线程,writer
// 线程进入cond.Wait()等待被激活。
if (state == STATE_ACCEPTING) {
accept();
}
// loop.
while (state != STATE_CLOSED && state != STATE_CONNECTING) {
char tag = -1;
// 读取信息标签,根据消息类型不同进行不同处理
if (tcp_read((char*)&tag, 1) < 0) {
pipe_lock.Lock();
continue;
}
if (tag == CEPH_MSGR_TAG_KEEPALIVE) {2
}
if (tag == CEPH_MSGR_TAG_KEEPALIVE2) {
}
if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
}
if (tag == CEPH_MSGR_TAG_ACK) {
}
else if (tag == CEPH_MSGR_TAG_MSG) {
// 读取消息
Message *m = 0;
int r = read_message(&m, auth_handler.get());
// 激活writer线程,回复ACK
cond.Signal(); // wake up writer, to ack this
in_q->fast_preprocess(m);
// 根据消息的不同进行不同的处理
// 延时处理
// 快速处理
// 放入队列根据优先级
if (delay_thread) {
delay_thread->queue(release, m);
} else {
if (in_q->can_fast_dispatch(m)) {
in_q->fast_dispatch(m);
if (state == STATE_CLOSED ||notify_on_dispatch_done) {
// there might be somebody waiting
notify_on_dispatch_done = false;
cond.Signal();
}
} else {
// 放入dispatchQueue队列里,交给dispatcher处理
in_q->enqueue(m, m->get_priority(), conn_id);
}
}
}
}
Writer线程启动后,start_rontine为Pipe::writer(), writer进入等待
状态,等待被激活,激活后从Map
void Pipe::writer()
{
pipe_lock.Lock();
while (state != STATE_CLOSED) {// && state != STATE_WAIT) {
if (state != STATE_CONNECTING && state != STATE_WAIT &&
state != STATE_STANDBY && (is_queued() || in_seq >
in_seq_acked)) {
// handle keekalive etc
....
// grab outgoing message from out_q for sending
Message *m = _get_next_outgoing();
if (m) {
// encode and copy out of *m
m->encode(features, msgr->crcflags);
// prepare everything, get message header
const ceph_msg_header& header = m->get_header();
const ceph_msg_footer& footer = m->get_footer();
bufferlist blist = m->get_payload();
blist.append(m->get_middle());
blist.append(m->get_data());
// send message
// write_message()->Pipe::do_sendmsg()->::sendmsg()
int rc = write_message(header, footer, blist);
m->put();
}
}
// wait for wake up
cond.Wait(pipe_lock);
}
}
Reader线程将读取的消息放入DispatchQueue::in_q后, DispathchQueue
将消息放入优先级队列mqueue中,然后激活DispatchThread线程,在线程
的start_routine函数DispatcheQueue::entry()中,调用Messenger::
ms_deliver_dispatch()函数将消息分发给所有已注册的Dispatcher,这
里以Monitor为例,Monitor在ms_dispatch()进一步在dispatch_op()中
根据消息的不同调用不同的handle函数处理, 如PING操作,调用handle
ping(), 在handle_ping()处理完后,将回复消息通过Messenger::send_
reply()再调用Messenger::send_message()通过_submit_message()函数
新建一个Pipe,利用Pipe::_send()将回复消息放入Map
Pipe:: entry() { in_q.enqueue(...) }
--> DispathcQueue:: enqueue() {mqueue.enqueue(..); cond.Wait()}
---> DispatchQueue:: entry() {Messenger::ms_deliver_dispatch()}
----> Messenger:: ms_deliver_dispatch() {Dispatcher::
ms_dispatch() }
-----> Monitor::ms_dispatch() { _ms_dispatch()}
------> Monitor::_ms_dispatch() { dispatch_op()}
-------> Monitor:: dispatch_op() { handle_ping()}
--------> Monitor::handle_ping() {messenger->send_reply()}
---------> SimpleMessenger::send_reply(){ _send_message()}
----------> SimpleMessenger::_send_message() {submit_message()}
-----------> SimpleMessenger::submit_message() {Pipe::send()}
------------> Pipe::_send(){out_q[prt]push_back(); cond.Wait()}
void DispatchQueue::entry()
{
while (true) {
while (!mqueue.empty()) {
QueueItem qitem = mqueue.dequeue();
if (qitem.is_code()) {
....
} else {
// 取出消息
Message *m = qitem.get_message();
if (stop) {
m->put();
} else {
uint64_t msize = pre_dispatch(m);
// 调用Messenger进行消息的分发
msgr->ms_deliver_dispatch(m);
post_dispatch(m, msize);
}
}
}
if (stop)
break;
// wait for something to be put on queue
// 在DispatchQueue::enqueue()被激活
cond.Wait(lock);
}
}
Ceph是一个分布式对象存储和文件系统,且致力于提供高性能,可用性和可扩展性, Ceph最初来源于Sege Weil在University of California, Santa Cruz (UCSC)关于 存储系统的PhD研究项目,现在Ceph是开源的,由其全球性社区的存储工程师和研究者 共同维护。Ceph是统一存储系统,基于RADOS(Reliable, Autonomous, Distributed, Object, Store)提供对象(Object),块(Block),文件(File)三种存储方式。
Ceph存储系统具有如下特点:
Ceph生态系统大致可划分为四个部分:
Figure 1. Ceph生态系统概念架构
客户端使用元数据服务器执行元数据操作来确定数据位置,元数据服务器管理元数据 的位置以及在何处存储新数据。实际上,元数据也是存储在对象存储集群上的,而数 据文件的实际I/O则是发生在客户端和对象存储集群间。如此一来,更高层次的POSIX 功能(如,打开,关闭,重命名)就由元数据服务器管理,而POSIX功能(读和写)则由对 像存储集群管理。
Ceph存储系统支持对象存储,对象存储通过Rados网关进行访问Ceph存储集群。Ceph对 象网关是建立在Librados之上的对象存储接口,为应用提供Ceph存储集群的Restful 网关。 Ceph对象存储支持两种兼容接口: 兼容Amazon S3接口和兼容Swift接口。Ceph 对象存储使用Radosgw与Ceph存储集群交互,Radosgw是一个FastCGI模块。由于Radosgw 提供了兼容Openstack Swift接口和Amazon S3接口,对象存储网关有自己的用户管理。
Figure 2. Rados Gateway
Ceph对象网关Radosgw支持Restful API,如Amazon S3 API,Switf API等。 现在测试Amazon S3 API的python API的相关情况。
Radosgw支持的API有:
feature | status |
---|---|
List Buckets | supported |
Create Bucket | supported |
Delete Bucket | supported |
Bucket ACLs(Get, Put) | supported |
Bucket Location | supported |
Get Bucket Info(Head) | supported |
Put Object | supported |
Get Object | supported |
Delete Object | supported |
Object ACLs(Get, Put) | supported |
Get Object info(Head) | supported |
Post Object | supported |
Copy Object | supported |
Multipart Upload | supported |
要通过S3连接Radosgw访问Ceph,需要先在Radosgw-admin创建S3账户,根据 返回的(AccessKey, SecretKey)和Radosgw的HOST进行连接。
boto.connect_s3(aws_access_key_id=None, aws_secret_access_key=None, **kwargs)
Parameters:
aws_access_key_id (string) – Your AWS Access Key ID
aws_secret_access_key (string) – Your AWS Secret Access Key
Return type: boto.s3.connection.S3Connection
Returns: A connection to Amazon’s S3
创建一个S3Connection对象,进行连接。也可以使用boto.s3.connecton.S3Connection 实例化。
Code: ``` access_key = 'FGOSHYBW8F3GXGAXNR28' secret_key = 'k71RtJRKrMoAyH5IrI/jZ3q0xG+r2UMPhK1c1Bb/'
conn = boto.connect_s3( aws_access_key_id = access_key, aws_secret_access_key = secret_key, host = 'phycles', is_secure=False, calling_format = boto.s3.connection.OrdinaryCallingFormat(), ) ```
Bucket操作一些是S3Connection对象的方法,通过S3Connection进行调用。 一些是Bucket的属性和方法。
create_bucket(bucket_name, headers=None, location='', policy=None)
Parameters:
* bucket_name (string) – The name of the new bucket
* headers (dict) – Additional headers to pass along with the request to AWS.
* location (str) – The location of the new bucket. You can use one of the
constants in boto.s3.connection.Location (e.g.Location.EU,
Location.USWest, etc.).
* policy (boto.s3.acl.CannedACLStrings) – A canned ACL policy that will be
applied to the new key in S3.
S3Connection对象的方法,每个Bucket都是全局唯一的,所以创建Bucket的时候如果 已经存在了,那么会创建失败。默认情况下只需要提供Bucket_name就可以了。
Code:
bucket = conn.create_bucket('test-bucket')
get_all_buckets(headers = None)
S3Connection对象的方法,返回S3账户中所有的bucket。
for bucket in conn.get_all_buckets():
print "{name}\t{created}".format(
name = bucket.name,
created = bucket.creation_date,
)
list(prefix='', delimiter='', marker='', headers=None, encoding_type=None)
Return type: BucketListResultSet
Bucket对象的方法,返回Bucket中所有Object的key链表, 返回的类型为BucketListResultSet.
for key in bucket.list():
print "{name}\t{size}\t{modified}".format(
name = key.name,
size = key.size,
modified = key.last_modified,
)
获取和设置Object的ACL,有public-read, private等。
get_acl(key_name='', headers=None, version_id=None)
get_xml_acl(key_name='', headers=None, version_id=None)
Bucket对象的方法,获得Object的ACL。
set_acl(acl_or_str, key_name='', headers=None, version_id=None)
set_canned_acl(acl_str, key_name='', headers=None, version_id=None)
set_xml_acl(acl_str, key_name='', headers=None, version_id=None,
query_args='acl')
Bucket对象的方法,设置Object的ACL。
Code:
hello_key = bucket.get_key('apple.txt')
hello_key.set_canned_acl('public-read')
delete_bucket(bucket, headers=None)
Parameters:
* bucket_name (string) – The name of the bucket
* headers (dict) – Additional headers to pass along with the
request to AWS.
S3Connection对象的方法,要删除一个bucket,必须要确保bucket为空, bucket不为空,则删除失败。
Code:
conn.delete_bucket(bucket.name)
new_key(key_name=None)
Parameters:
* key_name (string) – The name of the key to create
* Return type: boto.s3.key.Key or subclass
* Returns: An instance of the newly created key object
上传Object要先Bucket里新建一个Key,然后再通过Key上传对象。 上传内容有多种方式:
通过已打开的文件指针所指向的内容作为存储Object的内容。文件为EOF或者文件指针读取 数据到指定的Size时停止。
set_contents_from_file(fp, headers=None, replace=True, cb=None, num_cb=10,
policy=None, md5=None, reduced_redundancy=False, query_args=None,
encrypt_key=False, size=None, rewind=False)
Parameter:
* fp - file pointer whose content to upload
Return type: int
Returns: The number of bytes written to the key.
通过文件名上传Object,上传的Object的数据为文件名所有的文件。(文件路径)
set_contents_from_filename(filename, headers=None, replace=True, cb=None,
num_cb=10, policy=None, md5=None, reduced_redundancy=False, encrypt_key=False)
Parameters:
* filename (string) – The name of the file that you want to put onto S3
Return type: int
Returns: The number of bytes written to the key.
通过字符串上传Object的内容,将字符串上传。 对于对象存储而言,没有文件夹的概念,
所有文件和文件夹都是看成一个Object。所以想要在对象存储表示文件夹,可以在Object
里有字符'/'来表示文件夹意义的标识符,但是利用前面的概念可以新建一个带有'/'的key,
这个key内容为空,来象征性的表示文件夹。这个文件夹里的Object都可以在新建的时候,
通过在key里面以这个文件的key最为前缀来表示,这样就可以表示为这个文件夹里面的文件
了。
set_contents_from_string(string_data, headers=None, replace=True, cb=None,
num_cb=10, policy=None, md5=None, reduced_redundancy=False, encrypt_key=False)
Parameter:
* string_data - the string to upload
通过文件指针所指向的数据流作为上传Object的内容。但是流对象是不可以移位的,数据
大小也是未知的,我们是无法在header里面指定内容的大小和内容的MD5。所以大文件上传
时可以避免计算MD5所带来的延迟,但是无法验证上传的数据。
set_contents_from_stream(fp, headers=None, replace=True, cb=None, num_cb=10,
policy=None, reduced_redundancy=False, query_args=None, size=None)
Parameters:
* fp (file) – the file whose contents are to be uploaded
Code:
key = bucket.new_key('apple.txt')
key.set_contents_from_filename('./apple.txt')
获取对象与上传对象方式有些类似,通过向Bucket提供KeyName获得key对象,再过通Key 获取对象。调用这个方法之前要先确认这个Key是否已经那个创建了。此方法会使用HEAD 方法请求key的存在。
get_key(key_name, headers=None, version_id=None, response_headers=None,
validate=True)
Parameters:
* key_name (string) – The name of the key to retrieve
* headers (dict) – The headers to send when retrieving the key
* version_id (string) –
* response_headers (dict) – A dictionary containing HTTP headers/values
that will override any headers associated with the stored object in
the response.
* validate (bool) – Verifies whether the key exists. If False, this will
not hit the service, constructing an in-memory object. Default is True.
Return type: boto.s3.key.Key
Returns: A Key object from this bucket.
Key对象的方法,以字符串的形式获取Object的内容,Object的内容以字符串返回。
get_contents_as_string(headers=None, cb=None, num_cb=10, torrent=False,
version_id=None, response_headers=None, encoding=None)
Return type: bytes or str
Returns: The contents of the file as bytes or a string
Key对象的方法,通过文件指针的形式获取Object的内容,将Object的内容下载到
已打开的文件指针所指向的文件中。
get_contents_to_file(fp, headers=None, cb=None, num_cb=10, torrent=False,
version_id=None, res_download_handler=None, response_headers=None)
Parameter:
* fp - file pointer to put data in
Return type: None
Key对象的方法,将Object的内容下载到文件名(文件名路径)所在的位置。
get_contents_to_filename(filename, headers=None, cb=None, num_cb=10,torrent
=False, version_id=None, res_download_handler=None, response_headers=None)
Parameter:
filename - file path to put data in
Return type: None
Key对象的方法,将Object的内容直接存储到文件指针所指向的文件。
get_file(fp, headers=None, cb=None, num_cb=10, torrent=False, version_id
=None,override_num_retries=None, response_headers=None)
Parameter:
fp - file pointer to put the data in
Return type: None
Code:
get_key = bucket.get_key('apple.txt')
get_key.get_contents_to_filename('./get.txt')
删除Bucket中的Object,可以指定Object的版本进行删除,如果指定了版本, 那么就只有指定的版本会被删除。
delete_key(key_name, headers=None, version_id=None, mfa_token=None)
Parameters:
key_name (string) – The key name to delete
version_id (string) – The version ID (optional)
mfa_token (tuple or list of strings) – A tuple or list consisting of
the serial number from the MFA device and the current value of the
six-digit token associated with the device. This value is required
anytime you are deleting versioned objects from a bucket that has
the MFADelete option on the bucket.
Return type: boto.s3.key.Key or subclass
Returns: A key object holding information on what was deleted.
从Bucket中删除Object,只需要将Object的Key删除掉就可以了。
bucket.delete_key('apple.txt')
Key对象的方法,获取Object的ACLs,一种以字符串的形式返回,另一种以xml的
形式返回。
get_acl(headers=None)
get_xml_acl(headers=None)
Key对象的方法,设置Object的ACLs,其中acl_str可以为public-read, private等。
通过设置ACL可以用来实现文件的共享。
set_acl(acl_str, headers=None)
set_canned_acl(acl_str, headers=None)
Key的方法,生成访问Key的URL。生成Object的下载URL,可以选择有效时间(以秒为单位)。
可以用来实现外链下载。
generate_url(expires_in, method='GET', headers=None, query_auth=True,
force_http=False, response_headers=None, expires_in_absolute=False,
version_id=None, policy=None, reduced_redundancy=False, encrypt_key=False)
Parameters:
expires_in (int) – How long the url is valid for, in seconds.
method (string) – The method to use for retrieving the file (default is GET).
headers (dict) – Any headers to pass along in the request.
query_auth (bool) – If True, signs the request in the URL.
Return type: string
Returns: The URL to access the key
Code:
hello_url = hello_key.generate_url(0, query_auth=False, force_http=True)
Bucket的方法,生成访问Bucket的URL。
generate_url(expires_in, method='GET', headers=None, force_http=False,
response_headers=None, expires_in_absolute=False)
Bucket的方法,通过复制现有的Key在Bucket新建Key,该方法可以用来实现文件的移动。
copy_key(new_key_name, src_bucket_name, src_key_name, metadata=None,
src_version_id=None, storage_class='STANDARD', preserve_acl=False,
encrypt_key=False, headers=None, query_args=None)
Parameters:
new_key_name (string) – The name of the new key
src_bucket_name (string) – The name of the source bucket
src_key_name (string) – The name of the source key
Return type: boto.s3.key.Key or subclass
Returns: An instance of the newly created key object
Key的方法,将Key复制到其他Bucket中,该方法可以用来实现文件的移动。
copy(dst_bucket, dst_key, metadata=None, reduced_redundancy=False,
preserve_acl=False, encrypt_key=False, validate_dst_bucket=True)
Parameters:
dst_bucket (string) – The name of the destination bucket
dst_key (string) – The name of the destination key
Return type: boto.s3.key.Key or subclass
Returns: An instance of the newly created key object
You can use most of standard Markdown features.