我帮您扩充和润色这篇教程的开篇部分。建议标题可以改为更具描述性的:“Pandas 数据分析实战指南”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
---
title: Pandas 数据分析实战指南
date: 2024-12-06 17:55:55
sticky: 1
tags:
- Python
- 数据分析
- Pandas
categories:
- 编程技术
description: 深入浅出的 Pandas 教程,带你掌握数据分析利器
cover: [建议放置一张与数据分析相关的封面图]
---

## 唠唠闲话

Pandas 的名字来源于"Panel Data"(面板数据),作为 Python 生态系统中最受欢迎的数据分析工具之一,Pandas 拥有强大的功能和简洁的语法。

本教程将从零开始,介绍 Pandas 的核心功能。

## 安装

通过 pip 安装:

```python
pip install pandas

数据读写操作

Pandas 提供了丰富的数据导入功能,支持多种数据格式。

读取数据

CSV 文件读取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import pandas as pd

# 基础读取
df = pd.read_csv('data.csv')

# 常用参数示例
df = pd.read_csv('data.csv',
encoding='utf-8', # 文件编码
sep=',', # 分隔符
header=0, # 指定表头行
index_col=0, # 指定索引列
skiprows=[0,2], # 跳过指定行
na_values=['NA', 'missing'], # 指定空值标记
nrows=1000 # 读取的行数
)

Excel 文件读取

1
2
3
4
5
6
7
8
9
10
# 基础读取
df = pd.read_excel('data.xlsx')

# 带参数的读取
df = pd.read_excel('data.xlsx',
sheet_name='Sheet1', # 指定工作表
header=0, # 表头行位置
skiprows=2, # 跳过前两行
usecols='A:C' # 使用的列范围
)

SQL 数据库读取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import pymysql
from sqlalchemy import create_engine

# 创建数据库连接
engine = create_engine('mysql+pymysql://user:password@localhost/dbname')

# 读取SQL查询结果
df = pd.read_sql('SELECT * FROM table_name', engine)

# 读取整个表
df = pd.read_sql_table('table_name', engine)

# 使用SQL查询语句
df = pd.read_sql_query('SELECT * FROM table_name WHERE column > 5', engine)

JSON 文件读取

1
2
3
4
5
6
7
8
9
10
11
12
# 标准JSON文件
df = pd.read_json('data.json',
orient='records', # JSON数据的格式
lines=False, # 是否每行一个JSON对象
encoding='utf-8' # 文件编码
)

# JSONL(JSON Lines)文件读取
df = pd.read_json('data.jsonl',
lines=True, # 按行读取JSON
orient='records' # JSON格式说明
)

这里 orient 参数决定 JSON 的数据结构:

  1. records(常用):每行数据是一个字典
1
[{"name": "Alice", "age": 25}, {"name": "Bob", "age": 30}]
  1. columns(默认):以列为主体
1
{"name": {"0": "Alice", "1": "Bob"}, "age": {"0": 25, "1": 30}}
  1. index:以行索引为主体
1
{"0": {"name": "Alice", "age": 25}, "1": {"name": "Bob", "age": 30}}

一般使用 records,格式最直观,一行数据对应一个完整记录。

查看数据概要

读取数据后,快速了解数据的基本情况:

1
2
3
4
5
6
7
8
9
10
11
12
# 创建示例DataFrame
df = pd.DataFrame({
'A': [1, 2, 3],
'B': ['a', 'b', 'c'],
'C': [1.1, 2.2, 3.3]
})

# 查看前n行数据(默认5行)
df.head(n=5)

# 查看后n行数据
df.tail(n=5)

查看属性相关,或统计相关:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
>>> df.shape
(3, 3)
>>>
>>> df.columns
Index(['A', 'B', 'C'], dtype='object')
>>>
>>> df.index
RangeIndex(start=0, stop=3, step=1)
>>>
>>> df.dtypes
A int64
B object
C float64
dtype: object
>>>
>>> df.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3 entries, 0 to 2
Data columns (total 3 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 A 3 non-null int64
1 B 3 non-null object
2 C 3 non-null float64
dtypes: float64(1), int64(1), object(1)
memory usage: 204.0+ bytes
>>>
>>> df.describe()
A C
count 3.0 3.00
mean 2.0 2.20
std 1.0 1.10
min 1.0 1.10
25% 1.5 1.65
50% 2.0 2.20
75% 2.5 2.75
max 3.0 3.30

数据保存

Pandas 同样支持将数据保存为多种格式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 保存为CSV
df.to_csv('output.csv',
index=False, # 是否保存索引
encoding='utf-8', # 文件编码
sep=',', # 分隔符
na_rep='NULL' # 空值的表示方式
)

# 保存为Excel
df.to_excel('output.xlsx',
sheet_name='Sheet1', # 工作表名称
index=False, # 是否保存索引
freeze_panes=(1,0) # 冻结窗格
)

# 保存为JSON
df.to_json('output.json',
orient='records', # JSON格式
lines=True, # 是否每行一个JSON对象
force_ascii=False # 允许非ASCII字符
)

# 保存到SQL数据库
df.to_sql('table_name',
engine, # SQLAlchemy引擎
if_exists='replace', # 如果表存在的处理方式
index=False # 是否保存索引
)

此外,在处理大型数据集时,可以使用 chunksize 参数分块读取数据,可以有效控制内存使用,比如:

1
2
3
for chunk in pd.read_csv('large_file.csv', chunksize=10000):
# 处理每个数据块
process_chunk(chunk)

Pandas 核心数据结构

Pandas 主要有两种数据结构:Series 和 DataFrame。法。

Series

Series 是一种类似于一维数组的对象,由数据和索引组成。可以理解为带有标签的一维数组。

创建 Series

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import pandas as pd

# 从列表创建
s1 = pd.Series([1, 2, 3, 4, 5])
print(s1)
# 输出:
# 0 1
# 1 2
# 2 3
# 3 4
# 4 5
# dtype: int64

# 指定索引
s2 = pd.Series([1, 2, 3, 4, 5],
index=['a', 'b', 'c', 'd', 'e'])
print(s2)
# 输出:
# a 1
# b 2
# c 3
# d 4
# e 5
# dtype: int64

# 从字典创建
s3 = pd.Series({
'a': 1,
'b': 2,
'c': 3
})
print(s3)

基本属性和操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 获取索引
print(s2.index) # Index(['a', 'b', 'c', 'd', 'e'], dtype='object')

# 获取值
print(s2.values) # array([1, 2, 3, 4, 5])

# 类型查看
print(s2.dtype) # int64

# 基本统计
print(s2.mean()) # 平均值
print(s2.sum()) # 求和
print(s2.std()) # 标准差

# 索引操作
print(s2['a']) # 通过标签访问
print(s2[['a', 'b']]) # 访问多个元素

DataFrame

DataFrame 是一个二维的表格型数据结构,可以看作由多个 Series 组成的字典。它同时具有行索引和列索引。

创建 DataFrame

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 从字典创建
df1 = pd.DataFrame({
'name': ['John', 'Anna', 'Peter'],
'age': [28, 22, 35],
'city': ['New York', 'Paris', 'London']
})

# 从列表创建
data = [
['John', 28, 'New York'],
['Anna', 22, 'Paris'],
['Peter', 35, 'London']
]
df2 = pd.DataFrame(data,
columns=['name', 'age', 'city'],
index=['p1', 'p2', 'p3'])

# 从Series字典创建
df3 = pd.DataFrame({
'name': pd.Series(['John', 'Anna', 'Peter']),
'age': pd.Series([28, 22, 35])
})

DataFrame 的基本属性

1
2
3
4
5
6
7
8
9
10
11
12
# 查看数据类型
print(df1.dtypes)

# 查看索引
print(df1.index) # 行索引
print(df1.columns) # 列索引

# 查看维度
print(df1.shape) # (行数, 列数)

# 基本信息
df1.info() # 显示基本信息摘要

数据查询和选择

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 选择单列(返回Series)
print(df1['name'])
print(df1.name) # 属性方式访问(不推荐)

# 选择多列(返回DataFrame)
print(df1[['name', 'age']])

# 使用loc按标签选择行
print(df2.loc['p1']) # 选择单行
print(df2.loc[['p1', 'p2']]) # 选择多行
print(df2.loc['p1':'p2']) # 选择连续的行(包含结束索引)

# 使用iloc按位置选择行
print(df1.iloc[0]) # 选择第一行
print(df1.iloc[0:2]) # 选择前两行(不包含结束索引)

# 条件选择
print(df1[df1['age'] > 25]) # 选择年龄大于25的行

常用操作

1
2
3
4
5
6
7
8
9
10
11
12
# 添加新列
df1['salary'] = [50000, 45000, 60000]

# 删除列
df2 = df1.drop('salary', axis=1) # axis=1表示列

# 重命名列
df1 = df1.rename('name': 'full_name'})
columns={
# 排序
df1 = df1.sort_values('age', ascending=False) # 按年龄降序
df1 = df1.sort_index() # 按索引排序

数据分析处理

让我们以一个天气数据集为例,展示 Pandas 的数据分析处理功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
data = {
'date': pd.date_range('2024-12-01', '2024-12-30'),
'temperature': ['25C', '23C', '22C', '20C', '18C', '15C'] * 5,
'humidity': [45, 50, 55, 60, 65, 70] * 5,
'weather': ['Sunny', 'Cloudy', 'Rain', 'Sunny', 'Cloudy', 'Snow'] * 5
}
df = pd.DataFrame(data)

# 设置日期索引
df.set_index('date', inplace=True)

# 清理温度数据
df.loc[:, 'temperature'] = df['temperature'].str.replace("C", "").astype('int32')

高级查询方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 1. 单值查询
temp = df.loc['2024-12-07', 'temperature'] # 返回标量

# 2. 单行多列查询
data = df.loc['2024-12-07', ['temperature', 'humidity']] # 返回Series

# 3. 日期范围查询
week_data = df.loc['2024-12-01':'2024-12-07', 'temperature']

# 4. 条件查询
cold_days = df.loc[df['temperature'] < 15] # 温度低于15度的天数
humid_days = df.loc[(df['humidity'] > 60) & (df['weather'] == 'Rain')] # 多条件组合

# 5. 函数查询
df.loc[lambda df: (df['temperature'] < 20) & (df['humidity'] > 50)]

# 6. 使用isin进行成员查询
df.loc[df['weather'].isin(['Snow', 'Rain'])]

修改数据

各种数据修改和转换方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 1. 直接修改
df.loc[:, 'temperature'] = df['temperature'] + 273.15 # 转换为开尔文温度

# 2. 新增列
df.loc[:, 'feels_like'] = df['temperature'] - 3 # 体感温度

# 3. apply函数应用
# 按列应用(默认axis=0)
def celsius_to_fahrenheit(c):
return c * 9/5 + 32

df['fahrenheit'] = df['temperature'].apply(celsius_to_fahrenheit)

# 按行应用(axis=1)
def weather_summary(row):
return f"Temperature: {row['temperature']}°C, Weather: {row['weather']}"

df['summary'] = df.apply(weather_summary, axis=1)

# 4. assign方法(创建新的DataFrame)
df_new = df.assign(
temp_normalized = lambda x: (x['temperature'] - x['temperature'].mean()) / x['temperature'].std(),
is_warm = lambda x: x['temperature'] > 20,
humidity_level = lambda x: pd.cut(x['humidity'],
bins=[0, 30, 60, 100],
labels=['Low', 'Medium', 'High'])
)

数据合并

展示不同的数据合并场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 准备示例数据
df1 = pd.DataFrame({
'city': ['北京', '上海', '广州'],
'temperature': [20, 25, 28],
'humidity': [50, 60, 70]
})

df2 = pd.DataFrame({
'city': ['北京', '上海', '深圳'],
'wind_speed': [10, 12, 15],
'pressure': [1013, 1015, 1012]
})

# 1. 基础合并
pd.merge(df1, df2, on='city') # 默认inner join

# 2. 不同类型的连接
# 外连接(保留所有数据)
pd.merge(df1, df2, on='city', how='outer')

# 左连接(保留左表所有数据)
pd.merge(df1, df2, on='city', how='left')

# 3. 使用不同的键合并
df2.rename(columns={'city': 'city_name'}, inplace=True)
pd.merge(df1, df2, left_on='city', right_on='city_name')

# 4. 索引合并
df1.set_index('city', inplace=True)
df2.set_index('city_name', inplace=True)
pd.merge(df1, df2, left_index=True, right_index=True)

常用统计方法

统计分析方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 1. 值计数
weather_counts = df['weather'].value_counts() # 各类天气出现次数
weather_counts_normalized = df['weather'].value_counts(normalize=True) # 百分比

# 2. 描述性统计
stats = df.describe() # 包含count, mean, std, min, 25%, 50%, 75%, max
stats_by_weather = df.groupby('weather')['temperature'].describe() # 按天气分组统计

# 3. 唯一值查看
unique_weather = df['weather'].unique() # 唯一值数组
nunique_weather = df['weather'].nunique() # 唯一值数量

# 4. 相关性分析
correlation = df[['temperature', 'humidity']].corr() # 相关系数矩阵
covariance = df[['temperature', 'humidity']].cov() # 协方差矩阵

# 5. 分组统计
monthly_stats = df.groupby(df.index.month).agg({
'temperature': ['mean', 'min', 'max'],
'humidity': ['mean', 'std'],
'weather': lambda x: x.value_counts().index[0] # 最常见的天气
})

注意:

  1. 使用 inplace=True 时要小心,它会直接修改原始数据
  2. 合并操作前最好检查重复值情况
  3. 统计方法通常会自动忽略 NaN

并行处理

在处理大规模数据时,利用并行计算可以显著提升处理效率。Pandas 提供了几种并行处理的方法。

1. 使用 pandarallel

pandarallel 是一个简单但强大的 Pandas 并行处理库。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from pandarallel import pandarallel

# 初始化并行环境
pandarallel.initialize(progress_bar=True, nb_workers=4)

# 示例数据
df = pd.DataFrame({
'A': range(100000),
'B': range(100000)
})

# 普通操作
def complex_operation(x):
# 模拟耗时操作
time.sleep(0.001)
return x ** 2 + x ** 3

# 串行处理
df['result_serial'] = df['A'].apply(complex_operation)

# 并行处理
df['result_parallel'] = df['A'].parallel_apply(complex_operation)

2. 使用 Dask

Dask 是一个灵活的并行计算库,可以处理超大规模数据。

1
2
3
4
5
6
7
8
import dask.dataframe as dd

# 将Pandas DataFrame转换为Dask DataFrame
ddf = dd.from_pandas(df, npartitions=4)

# 并行处理
result = ddf.map_partitions(lambda df: df.apply(complex_operation))
result = result.compute() # 获取结果

3. 分块处理大数据

对于超大文件,可以使用分块读取和处理:

1
2
3
4
5
6
7
8
9
10
11
12
def process_chunk(chunk):
# 对每个数据块进行处理
return chunk.apply(complex_operation)

# 分块读取并处理
chunks = []
for chunk in pd.read_csv('large_file.csv', chunksize=10000):
processed = process_chunk(chunk)
chunks.append(processed)

# 合并结果
result = pd.concat(chunks)

4. 使用 swifter

swifter 可以自动选择最优的执行方式(串行/并行):

1
2
3
4
import swifter

# 安装:pip install swifter
df['result'] = df['A'].swifter.apply(complex_operation)

5. 多进程处理

使用 Python 的 multiprocessing 进行并行处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from multiprocessing import Pool
import numpy as np

def parallel_process(df_split):
return df_split.apply(complex_operation)

def parallel_dataframe(df, func, n_cores=4):
df_split = np.array_split(df, n_cores)
pool = Pool(n_cores)
df = pd.concat(pool.map(func, df_split))
pool.close()
pool.join()
return df

# 使用示例
result = parallel_dataframe(df, parallel_process)

6. 性能优化建议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 1. 使用适当的数据类型
df = df.astype({
'int_col': 'int32', # 使用较小的整数类型
'float_col': 'float32', # 使用较小的浮点数类型
'cat_col': 'category' # 分类数据使用category类型
})

# 2. 使用向量化操作替代循环
# 不推荐
for i in range(len(df)):
df.iloc[i, 0] = df.iloc[i, 0] * 2

# 推荐
df.iloc[:, 0] = df.iloc[:, 0] * 2

# 3. 预先分配内存
# 不推荐
df = pd.DataFrame()
for i in range(1000):
df = pd.concat([df, pd.DataFrame([i])])

# 推荐
df = pd.DataFrame(index=range(1000))
for i in range(1000):
df.iloc[i] = i

7. 性能对比示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import time

def benchmark_processing_methods():
df = pd.DataFrame({'A': range(100000)})

# 串行处理
start = time.time()
df['serial'] = df['A'].apply(complex_operation)
serial_time = time.time() - start

# pandarallel
start = time.time()
df['parallel'] = df['A'].parallel_apply(complex_operation)
parallel_time = time.time() - start

# swifter
start = time.time()
df['swifter'] = df['A'].swifter.apply(complex_operation)
swifter_time = time.time() - start

print(f"串行处理时间: {serial_time:.2f}秒")
print(f"Pandarallel处理时间: {parallel_time:.2f}秒")
print(f"Swifter处理时间: {swifter_time:.2f}秒")

benchmark_processing_methods()

注意事项:

  1. 并行处理并不总是最优选择,对于小数据集,开销可能大于收益
  2. 多进程处理会占用更多内存
  3. 某些操作本身就是并行优化的(如 groupby 操作)
  4. 在使用并行处理时要注意数据的线程安全性