2025-09-10:仓库迁移

This commit is contained in:
2025-09-10 16:12:45 +08:00
parent e0e49b0ac9
commit 3130e336a1
146 changed files with 4066 additions and 0 deletions

View File

@@ -0,0 +1 @@
Symbol,Price,Date,Time,Change,Volume
1 Symbol Price Date Time Change Volume AA 39.48 6/11/2007 9:36am -0.18 181800 AIG 71.38 6/11/2007 9:36am -0.15 195500 AXP 62.58 6/11/2007 9:36am -0.46 935000 BA 98.31 6/11/2007 9:36am +0.12 104800 C 53.08 6/11/2007 9:36am -0.25 360900 CAT 78.29 6/11/2007 9:36am -0.23 225400

View File

@@ -0,0 +1,37 @@
import csv
from collections import namedtuple
if __name__ == "__main__":
# 想要读取csv数据可以使用csv库
with open("6.数据编码与处理/1.stocks.csv") as f:
f_csv = csv.reader(f)
headers = next(f_csv)
print(headers)
for row in f_csv:
print(row)
# 但是这样处理就非常不妙所有的东西混在一起变成奇奇怪怪的样子不如使用named_tuple
f.seek(0)
Row = namedtuple("Row", headers)
for row in f_csv:
print(row)
row = Row(*row)
print(row.Symbol)
f.seek(0)
f_csv = csv.DictReader(f)
for row in f_csv:
print(row['Symbol'])
# 如果想要插入一条数据可以使用writer
rows = [
("BB", 123, "9/12/2024", "9:36AM", -12, 999)
]
with open("6.数据编码与处理/1.stocks.csv", 'w+') as f:
f_csv = csv.writer(f)
f_csv.writerows(rows)
# 我的评价是不如pandas

View File

@@ -0,0 +1,14 @@
import base64
if __name__ == '__main__':
# 如果需要使用b64编码对二进制数据做编码和解码操作
s = b'Hello World!'
# 可以使用base64的b64encoder和decoder
a = base64.b64encode(s)
print(a)
b = base64.b64decode(a)
print(b)
# 和上一章一样如果你需要unicode需要再解码一次
a = a.decode('ascii')
print(a)

Binary file not shown.

View File

@@ -0,0 +1,39 @@
from struct import Struct
def write_records(records, format, f):
# 创建一个结构体
record_struct = Struct(format)
# 将数组元素解包后打包放入文件
for r in records:
f.write(record_struct.pack(*r))
# 简单来说,这是什么原理呢?
# 就是我知道我结构体的格式是什么样的,然后我造一个相同的结构体,然后把数据按照结构大小分块读出来,然后重新解包
# 本质就是结构体的pack和unpack
def read_records(format, file):
record_struct = Struct(format)
# b''表示直到数据返回变成b''我们就收手
chunks = iter(lambda: file.read(record_struct.size), b'')
return (record_struct.unpack(chunk) for chunk in chunks)
if __name__ == '__main__':
records = [
(1, 2.3, 4.5),
(6, 7.8, 9.0),
(12, 13.4, 56.7)
]
# 首先我们造一个二进制结构数组文件
# with open('6.数据编码与处理/11.data.bin', 'wb') as f:
# write_records(records, '<idd', f)
with open('6.数据编码与处理/11.data.bin', 'rb') as f:
for rec in read_records("<idd", f):
print(rec)
# 要进一步理解Struct我们就要学习这个包的<idd是什么东西了

Binary file not shown.

View File

@@ -0,0 +1,220 @@
import struct, itertools
if __name__ == "__main__":
# 有时候,我们需要将一系列嵌套的可变长度记录与二进制编码之间做一些转换
polys = [
[(1.0, 2.5), (3.5, 4.0), (2.5, 1.5)],
[(7.0, 1.2), (5.1, 3.0), (0.5, 7.5), (0.8, 9.0)],
[(3.4, 6.3), (1.2, 0.5), (4.6, 9.2)]
]
# 现在我们需要将它组织成下面这样的二进制结构
# 文件头
"""
字节 类型 描述
0 int 文件代码(0x1234小端)
4 double x的最小值(小端)
12 double y的最小值(小端)
20 double x的最大值(小端)
28 double y的最大值(小端)
36 int 三角形数量(小端)
"""
# 文件内容
"""
字节 类型 描述
0 int 记录长度(N字节)
4-N Points (X,Y)坐标,以浮点数表示
"""
# 正常情况下,我们通过文件的具体结构来组织二进制文件的写入和读取
def write_ploys(filename, ploys):
# 将嵌套的多个点列表展开变成一个大的点列表
flattened = list(itertools.chain(*ploys))
min_x = min(x for x, y in flattened)
min_y = min(y for x, y in flattened)
max_x = max(x for x, y in flattened)
max_y = max(y for x, y in flattened)
with open(filename, 'wb') as f:
# 将数据按结构写入结构体中打包写入文件
f.write(struct.pack('<iddddi', 0x1234, min_x, min_y, max_x, max_y, len(ploys)))
for ploy in ploys:
# 计算需要多少空间
size = len(ploy) * struct.calcsize('<dd')
# 写入记录长度
f.write(struct.pack('<i', size + 4))
# 将所有点坐标写入
for pt in ploy:
f.write(struct.pack('<dd', *pt))
def read_ploys(filename):
with open(filename, 'rb') as f:
# 按照设计好的数据结构读出文件头
header = f.read(40)
# 解包文件头里的信息
file_code, min_x, min_y, max_x, max_y, num_ploys = struct.unpack('<iddddi', header)
# 初始化数组,用于存放即将解包的点坐标组
ploys = []
for n in range(num_ploys):
# 用于存储还原的一组点坐标
ploy = []
# 得到一组的数据有多长
pbytes, = struct.unpack('<i', f.read(4))
# 因为一对点坐标是两个double字节长16所以一共要读长度//16次一次读出16字节解包
for m in range(pbytes//16):
pt = struct.unpack('<dd', f.read(16))
ploy.append(pt)
ploys.append(ploy)
return ploys
# write_ploys("6.数据编码与处理/12.test.bin", ploys=polys)
data = read_ploys("12.test.bin")
print(data)
# 正常来说,我们是用上面这种方法来读取数据,但是这样很乱,所以有了基于类的升级款:
# 字段数据,在外层被 .属性 调用时__get__方法会运作
class StructField:
# 默认方法,标明格式和偏移量
def __init__(self, format, offset):
self.format = format
self.offset = offset
# 被 .属性 调用时外层obj作为instance进入函数
def __get__(self, instance, cls):
if instance is None:
return self
else:
# 从Structure._buffer里把文件薅出来解包
r = struct.unpack_from(self.format, instance._buffer, self.offset)
return r[0] if len(r) == 1 else r
class Structure:
def __init__(self, bytedata):
self._buffer = memoryview(bytedata)
# 然后我们就可以定义一个文件头
class PolyHeader(Structure):
file_code = StructField('<i', 0)
min_x = StructField('<d', 4)
min_y = StructField('<d', 12)
max_x = StructField('<d', 20)
max_y = StructField('<d', 28)
num_poly = StructField('<i', 36)
with open("12.test.bin", 'rb') as f:
phead = PolyHeader(f.read(40))
print(phead.min_x)
# 但是这样还是很麻烦为什么呢因为我还要定义好大一个PolyHeader类里面还要写死一些东西
# 于是就有了上面方案的进化元类版本
# 元类
class StructureMeta(type):
# 默认方法通过init函数生成类字段
def __init__(self, clsname, bases, clsdict):
fields = getattr(self, '_fields_', [])
byte_order = ''
offset = 0
for format, field_name in fields:
if format.startswith(('<', '>', '!', '@')):
byte_order = format[0]
format = format[1:]
format = byte_order + format
setattr(self, field_name, StructField(format, offset))
offset += struct.calcsize(format)
setattr(self, 'struct_size', offset)
# 改进的structure类
class Structure_v2(metaclass=StructureMeta):
def __init__(self, bytedata):
self._buffer = memoryview(bytedata)
# 类的方法,在实例化以后才能被调用
@classmethod
def from_file(cls, f):
return cls(f.read(cls.struct_size))
# 经过修改之后我们只需要告诉类字段名称和格式就行了
class PolyHeader_v2(Structure_v2):
_fields_ = [
('<i', 'file_code'),
('d', 'min_x'),
('d', 'min_y'),
('d', 'max_x'),
('d', 'max_y'),
('i', 'num_polys'),
]
with open("12.test.bin", 'rb') as f:
phead = PolyHeader_v2.from_file(f)
print(phead.max_y)
# 这个东西还能继续优化,比如加入一些新功能
# 改进的元类如果输入的是一个对象就设置为NestedStruct如果是格式就设置为字段
class StructureMeta_v2(type):
# 默认方法通过init函数生成类字段
def __init__(self, clsname, bases, clsdict):
fields = getattr(self, '_fields_', [])
byte_order = ''
offset = 0
for format, field_name in fields:
if isinstance(format, StructureMeta_v2):
setattr(self, field_name, NestedStruct(field_name, format, offset))
offset += format.struct_size
else:
if format.startswith(('<', '>', '!', '@')):
byte_order = format[0]
format = format[1:]
format = byte_order + format
setattr(self, field_name, StructField(format, offset))
offset += struct.calcsize(format)
setattr(self, 'struct_size', offset)
# 改进的structure类
class NestedStruct:
def __init__(self, name, struct_type, offset):
self.name = name
self.struct_type = struct_type
self.offset = offset
def __get__(self, instance, cls):
if instance is None:
return self
else:
data = instance._buffer[self.offset: self.offset + self.struct_type.struct_size]
result = self.struct_type(data)
setattr(instance, self.name, result)
return result
# 改进的structure类基础方法在init里设置memoryview来进行懒加载
class Structure_v3(metaclass=StructureMeta_v2):
def __init__(self, bytedata):
self._buffer = memoryview(bytedata)
# 类的方法,在实例化以后才能被调用
@classmethod
def from_file(cls, f):
return cls(f.read(cls.struct_size))
class Point(Structure_v3):
_fields_ = [
('<d', 'x'),
('d', 'y')
]
class PloyHeader(Structure_v3):
_fields_ = [
('<i', 'file_code'),
(Point, 'min'),
(Point, 'max'),
('i', 'num_polys'),
]
with open("12.test.bin", 'rb') as f:
phead = PloyHeader.from_file(f)
print(phead.min)

View File

@@ -0,0 +1,69 @@
import json
from collections import OrderedDict
if __name__ == '__main__':
# JSON是前后端传输里很常见的东西很多数据要打成JSON字符串才能传输
# 我们一般使用JSON.dumps和JSON.loads
data = {
"name": 'ACME',
"shares": 100,
"price": 542.33
}
data_json = json.dumps(data)
print(data_json)
data = json.loads(data_json)
print(data)
# 但如果是同.json文件打交道还是使用dump和load
# 在加载的时候也可以把json对象载入成OrderedDict或Python对象
data = json.loads(data_json, object_pairs_hook=OrderedDict)
print(data)
class JsonObj:
def __init__(self, transform_data):
self.__dict__ = transform_data
data = json.loads(data_json, object_hook=JsonObj)
print(data.name)
# 在转换为Json时对象这个东西是无法直接被Json解析的想要解析得把它转化成可以被序列化处理的字典
# 我们可以实现一个解析器函数将类的字段映射到字典里load的时候再创建一个类把它反序列化出来
class Point:
def __init__(self, x, y):
self.x = x
self.y = y
classes = {
"Point": Point
}
# 类->Json
def serialize_instance(obj):
d = {'__class__': type(obj).__name__}
d.update(vars(obj))
return d
# JsonStr->类
def unserialize_instance(dic):
cls_name = dic.pop('__class__', None)
if cls_name:
cls = classes[cls_name]
obj = cls.__new__(cls)
for k, v in dic.items():
setattr(obj, k, v)
return obj
else:
return dic
p = Point(2, 3)
s = json.dumps(p, default=serialize_instance)
print(s)
a = json.loads(s, object_hook=unserialize_instance)
print(a.x, a.y)

View File

@@ -0,0 +1,17 @@
from urllib.request import urlopen
from xml.etree.ElementTree import parse
if __name__ == "__main__":
u = urlopen('http://planet.python.org/rss20.xml')
# 使用xml.etree.ElementTree的parse方法来解析文档
doc = parse(u)
print(doc)
# 可以使用.iterfind来对解析出来的文档进行迭代找到所有指定名称的节点
for item in doc.iterfind('channel/item'):
# 使用findtext在节点中寻找需要的数据
title = item.findtext('title')
date = item.findtext('pubDate')
link = item.findtext('link')
print(title, date, link, sep="\n")

View File

@@ -0,0 +1,45 @@
from xml.etree.ElementTree import iterparse
from urllib.request import urlopen
# 在尤其关注节省内存时牺牲运行速度使用增量生成的方式读取XML文档
def parse_and_remove(filename, path):
# 将节点路径划分
path_parts = path.split('/')
# 通过迭代的方式解析XML文件
doc = iterparse(filename, events=('start', 'end'))
# 跳过根节点
next(doc)
# 设置存储栈
tag_stack = []
elem_stack = []
for event, elem in doc:
# iterparse会输出event和elem元组表示状态和元素
if event == 'start':
# 如果检测状态是start就入栈
tag_stack.append(elem.tag)
elem_stack.append(elem)
elif event == 'end':
# 如果检测状态是end就出栈
# print(tag_stack, path_parts)
# 如果路径和需要寻找的标签相同,就生成并从栈中抹除
if tag_stack == path_parts:
yield elem
elem_stack[-2].remove(elem)
try:
tag_stack.pop()
elem_stack.pop()
except IndexError:
pass
if __name__ == '__main__':
u = urlopen('http://planet.python.org/rss20.xml')
data = parse_and_remove(u, 'channel/item/title')
# print(data)
for d in data:
print(d.text)

View File

@@ -0,0 +1,21 @@
from xml.etree.ElementTree import Element
def dict_to_xml(tag, d):
# 创建一个element实例
elem = Element(tag)
# 将字典键值对拆开填入
for k, v in d.items():
child = Element(k)
child.text = str(v)
elem.append(child)
return elem
if __name__ == '__main__':
s = {"name": 'GOOG',
"shares": 100,
"price": 490.1}
e = dict_to_xml("stock", s)
print(e)

View File

@@ -0,0 +1,23 @@
<?xml version="1.0"?>
<stop>
<id>14791</id>
<nm>Clark &amp; Balmoral</nm>
<sri>
<rt>22</rt>
<d>North Bound</d>
<dd>North Bound</dd>
</sri>
<cr>22</cr>
<pre>
<pt>5 MIN</pt>
<fd>Howard</fd>
<v>1378</v>
<rn>22</rn>
</pre>
<pre>
<pt>15 MIN</pt>
<fd>Howard</fd>
<v>1867</v>
<rn>22</rn>
</pre>
</stop>

View File

@@ -0,0 +1,17 @@
<?xml version='1.0' encoding='us-ascii'?>
<stop>
<id>14791</id>
<nm>Clark &amp; Balmoral</nm>
<spam>This is a test</spam><pre>
<pt>5 MIN</pt>
<fd>Howard</fd>
<v>1378</v>
<rn>22</rn>
</pre>
<pre>
<pt>15 MIN</pt>
<fd>Howard</fd>
<v>1867</v>
<rn>22</rn>
</pre>
</stop>

View File

@@ -0,0 +1,22 @@
from xml.etree.ElementTree import parse, Element
if __name__ == '__main__':
doc = parse(r'6.数据编码与处理/6.test.xml')
# 得到XML的root
root = doc.getroot()
print(root)
# 从root里移除一些从root里找到的节点
root.remove(root.find('sri'))
root.remove(root.find('cr'))
# 得到某个具体节点的下标
print(root.getchildren().index(root.find('nm')))
# 创建一个节点,并在根节点下插入
e = Element('spam')
e.text = 'This is a test'
root.insert(2, e)
# 保存为XML文件
doc.write(r'6.数据编码与处理/6.test_result.xml', xml_declaration=True)

View File

@@ -0,0 +1,14 @@
<?xml version="1.0" encoding="utf-8"?>
<top>
<author>David Beazley</author>
<content>
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<title>Hello World</title>
</head>
<body>
<h1>Hello World!</h1>
</body>
</html>
</content>
</top>

View File

@@ -0,0 +1,34 @@
from xml.etree.ElementTree import parse
class XMLNameSpace:
def __init__(self, **kwargs):
self.namespaces = {}
for name, url in kwargs.items():
self.register(name, url)
def register(self, name, url):
self.namespaces[name] = '{'+url+'}'
def __call__(self, path):
return path.format_map(self.namespaces)
if __name__ == '__main__':
# 如果是一个有命名空间的XML文件那么很明显传统的读取方法会变得繁琐
doc = parse('6.数据编码与处理/7.exp.xml')
print(doc.findtext('author'))
print(doc.find('content'))
# 由于html不在全局命名空间下所以找不到这个东西
print(doc.find('content/html'))
# 在指定了命名空间的所有层级下,都要使用{}来标识命名空间
print(doc.find('content/{http://www.w3.org/1999/xhtml}html'))
# 记住,我说的是,所有层级
# 不起效
print(doc.find('content/{http://www.w3.org/1999/xhtml}html/head'))
# 正常生效
print(doc.find('content/{http://www.w3.org/1999/xhtml}html/{http://www.w3.org/1999/xhtml}head'))
# 这样就很甜蜜曹丹,好麻烦,而且还要手动输入命名空间,不如写个类让它自己解析了
ns = XMLNameSpace(html="http://www.w3.org/1999/xhtml")
print(doc.find(ns('content/{html}html')))

Binary file not shown.

View File

@@ -0,0 +1,33 @@
import sqlite3
if __name__ == "__main__":
# 在Python中数据库的输入和输出使用如下元组来表示
stocks = [
('GooG', 100, 490.1),
('AAPL', 50, 545.75),
('FB', 150, 7.45),
('HPQ', 75, 33.2)
]
# 首先我们需要创建一个数据库连接
db = sqlite3.connect('8.db_test_sqlite3.db')
print(db)
# 然后创建一个游标与数据库进行交互
c = db.cursor()
# c.execute('create table portfolio (symbol text, shares integer, price real)')
# db.commit()
# 使用execute和executemany命令与数据库进行交互
# 务必使用作为占位符让SQL执行替换否则这就是SQL注入攻击的漏洞
c.executemany('insert into portfolio values (?, ?, ?)', stocks)
db.commit()
for row in db.execute('select * from portfolio'):
print(row)
c.close()
db.close()
# 好消息是大部分的网络框架在需要数据库时都做了ORM封装不用这么麻烦的去写代码调动数据库了

View File

@@ -0,0 +1,21 @@
import base64, binascii
if __name__ == '__main__':
s = b'Hello World!'
# 将字节串以十六进制编码
h = binascii.b2a_hex(s)
print(h)
# 将数据从十六进制解码成字节串
b = binascii.a2b_hex(h)
print(b)
# 同理在熟悉的base64模块我们也能实现同样的功能
h = base64.b16encode(s)
print(h)
b = base64.b16decode(h)
print(b)
# 个人感觉base64的API更加干净
# 对了如果想要将数据以unicode输出可以使用一次decode
print(h.decode('ascii'))