测试平台系列(90) 编写oss客户端

虚幻大学 xuhss 434℃ 0评论

Python微信订餐小程序课程视频

https://edu.csdn.net/course/detail/36074

Python实战量化交易理财系统

https://edu.csdn.net/course/detail/35475

大家好~我是米洛
我正在从0到1打造一个开源的接口测试平台, 也在编写一套与之对应的教程,希望大家多多支持。
欢迎关注我的公众号米洛的测开日记,获取最新文章教程!

回顾

上一节我们编写了在线执行测试计划功能,并稍微改了下报告页面。那其实我们之前的内容都是有很多在里面的。

比如http请求只支持了json和form,没有支持文件上传的请求,甚至有一些crud的功能都没有太完善。

不过不要紧,我的想法还是先创造,再完善。当然也不是盲目创造,也得提前预判好后面的走向。

为什么要用oss

pity里面oss打算用在2个地方,第一个就是一些静态资源图片。比方说项目图片用户头像

另一个地方就是上文说的,测试文件上传接口的时候,我们需要测试文件。这些文件怎么来,怎么管理?都得借助oss来完成。

目前以我熟悉的oss为例,打算支持以下几种oss:

  • 阿里云oss ✔
  • 腾讯云cos 腾讯云cos(无账号) ?
  • 七牛云(免费,我能给展示demo)
  • gitee(免费)

其中阿里云和腾讯云由于都是付费产品,博主还是买不起的。所以可能需要有对应的测试账号,在此感谢小右(Mini-Right)的帮助,给了我一个阿里云的测试账号,保证了crud能正常进行。

如果需要我实现其他oss客户端,请带上对应的测试账号私信我哈。

测试网站的数据估计会用gitee或者七牛云

说明

关于文件管理这块,由于时间关系,我暂时不会落一张表与oss数据进行关联,主要目的是为了节省时间

表关联可以作为二期工程。

编写oss基类

新建app.middleware.oss.oss_file.py

由于咱们支持多种oss客户端,所以包装好一个抽象类(类似go的interface)等着各个oss客户端去实现之。

复制代码
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

`from abc import ABC, abstractmethod
from typing import ByteString

class OssFile(ABC):

@abstractmethod
def create_file(self, filepath: str, content: ByteString):
pass
@abstractmethod
def update_file(self, filepath: str, content: ByteString):
pass
@abstractmethod
def delete_file(self, filepath: str):
pass
@abstractmethod
def list_file(self):
pass
@abstractmethod
def download_file(self, filepath):
pass`
抽象类没有具体的实现,只有方法的定义,目的是为了限制子类的方法,即必须实现基类中定义的方法。

总结了一下,必须有增删改查下载5种方法。

编写AliyunOss实现

新建app.middleware.oss.aliyun.py

复制代码
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

`from typing import ByteString

import oss2

from app.middleware.oss.files import OssFile

class AliyunOss(OssFile):

def __init__(self, access_key_id: str, access_key_secret: str, endpoint: str, bucket: str):
auth = oss2.Auth(access_key_id=access_key_id,
access_key_secret=access_key_secret)

auth = oss2.AnonymousAuth()

self.bucket = oss2.Bucket(auth, endpoint, bucket)

def create_file(self, filepath: str, content: ByteString):
self.bucket.put_object(filepath, content)

def update_file(self, filepath: str, content: ByteString):
self.bucket.put_object(filepath, content)

def delete_file(self, filepath: str):
self.bucket.delete_object(filepath)

def list_file(self):
ans = []
for obj in oss2.ObjectIteratorV2(self.bucket):
ans.append(dict(key=obj.key, last_modified=obj.last_modified,
size=obj.size, owner=obj.owner))
return ans

def download_file(self, filepath):
if not self.bucket.object_exists(filepath):
raise Exception(f"oss文件: {filepath}不存在")
return self.bucket.get_object(filepath)`
AliyunOss继承了OssFile,构造方法获取阿里云的身份信息,并验证。最后读取bucket,这bucket我理解的是一块区域,你的文件都存储在这块区域里面,我们就叫他F盘吧。

其他的方法很简单,基本上是调用对应的api,去做crud操作。oss没有文件夹的概念,都是统一用路径来存储文件地址的,比如:

woody/github.txt

这个文件路径可以理解为,woody目录下的github.txt文件。

编写获取客户端方法

app.middleware.oss._\_init__.py

复制代码
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

`from app.core.configuration import SystemConfiguration
from app.middleware.oss.aliyun import AliyunOss
from app.middleware.oss.files import OssFile

class OssClient(object):
_client = None
@classmethod
def get_oss_client(cls) -> OssFile:
"""
通过oss配置拿到oss客户端
:return:
"""
if OssClient._client is None:
cfg = SystemConfiguration.get_config()
oss_config = cfg.get("oss")
access_key_id = oss_config.get("access_key_id")
access_key_secret = oss_config.get("access_key_secret")
bucket = oss_config.get("bucket")
endpoint = oss_config.get("endpoint")
if oss_config is None:
raise Exception("服务器未配置oss信息, 请在configuration.json中添加")
if oss_config.get("type").lower() == "aliyun":
return AliyunOss(access_key_id, access_key_secret, endpoint, bucket)
raise Exception("不支持的oss类型")
return OssClient._client`
我们在configuration.json配置oss信息,接着每次都从OssClient获取客户端即可。

5d2139fc1f2b4148779489a4bdb774e4 - 测试平台系列(90) 编写oss客户端

编写后端接口

复制代码
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55

`from fastapi import APIRouter, File, UploadFile

from app.handler.fatcory import PityResponse
from app.middleware.oss import OssClient

router = APIRouter(prefix="/oss")

@router.post("/upload")
async def create_oss_file(filepath: str, file: UploadFile = File(...)):
try:
file_content = await file.read()

获取oss客户端

client = OssClient.get_oss_client()
client.create_file(filepath, file_content)
return PityResponse.success()
except Exception as e:
return PityResponse.failed(f"上传失败: {e}")

@router.get("/list")
async def list_oss_file():
try:
client = OssClient.get_oss_client()
files = client.list_file()
return PityResponse.success(files)
except Exception as e:
return PityResponse.failed(f"获取失败: {e}")

@router.get("/delete")
async def delete_oss_file(filepath: str):
try:
client = OssClient.get_oss_client()
client.delete_file(filepath)
return PityResponse.success()
except Exception as e:
return PityResponse.failed(f"删除失败: {e}")

@router.post("/update")
async def update_oss_file(filepath: str, file: UploadFile = File(...)):
"""
更新oss文件,路径不能变化
:param filepath:
:param file:
:return:
"""
try:
client = OssClient.get_oss_client()
file_content = await file.read()
client.update_file(filepath, file_content)
return PityResponse.success()
except Exception as e:
return PityResponse.failed(f"删除失败: {e}")`
方法很简单,文件路径在url参数里边,文件的话,利用fastapi里面的File和UploadFile皆可获取到上传的文件。

注意,必须先安装python-multipart库配合文件上传

测试一下

成功读取到了oss的文件信息,但下载接口

57090f8a7fd463dc00463ef070713ccd - 测试平台系列(90) 编写oss客户端

好像忘记下载文件相关内容了,那么参考我的上一篇FsatApi下载文件的文章吧。

FastApi下载文件

或者直接去github查看源码~

今天的内容就介绍到这里了,下一节卷oss的用途。

转载请注明:xuhss » 测试平台系列(90) 编写oss客户端

喜欢 (0)

您必须 登录 才能发表评论!