事务处理

什么是事务

事务(Transaction)是一组SQL操作的集合,这些操作要么全部成功执行,要么全部失败回滚。事务是数据库管理系统保证数据一致性和完整性的重要机制。

事务的ACID特性

  1. 原子性(Atomicity):事务中的所有操作是一个不可分割的整体
  2. 一致性(Consistency):事务执行前后,数据库都处于一致状态
  3. 隔离性(Isolation):多个事务并发执行时互不干扰
  4. 持久性(Durability):事务提交后,对数据的修改是永久性的

事务的基本语法

<?php
try {
    // 开始事务
    $pdo->beginTransaction();

    // 执行一系列SQL操作
    $pdo->exec("UPDATE accounts SET balance = balance - 100 WHERE id = 1");
    $pdo->exec("UPDATE accounts SET balance = balance + 100 WHERE id = 2");

    // 提交事务
    $pdo->commit();
    echo "转账成功!";

} catch (Exception $e) {
    // 发生错误时回滚事务
    $pdo->rollback();
    echo "转账失败:" . $e->getMessage();
}
?>

事务示例:银行转账系统

创建数据表

<?php
// 创建账户表
$createTableSQL = "
CREATE TABLE IF NOT EXISTS accounts (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(50) NOT NULL,
    balance DECIMAL(10, 2) NOT NULL DEFAULT 0.00,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
)";

$pdo->exec($createTableSQL);

// 创建转账记录表
$createTransferTableSQL = "
CREATE TABLE IF NOT EXISTS transfers (
    id INT AUTO_INCREMENT PRIMARY KEY,
    from_account_id INT NOT NULL,
    to_account_id INT NOT NULL,
    amount DECIMAL(10, 2) NOT NULL,
    description VARCHAR(255),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (from_account_id) REFERENCES accounts(id),
    FOREIGN KEY (to_account_id) REFERENCES accounts(id)
)";

$pdo->exec($createTransferTableSQL);

// 插入测试数据
$pdo->exec("INSERT INTO accounts (name, balance) VALUES
    ('张三', 1000.00),
    ('李四', 500.00),
    ('王五', 2000.00)");
?>

实现转账功能

<?php
class BankService {
    private $pdo;

    public function __construct($pdo) {
        $this->pdo = $pdo;
    }

    /**
     * 转账操作
     * @param int $fromAccountId 转出账户ID
     * @param int $toAccountId 转入账户ID
     * @param float $amount 转账金额
     * @param string $description 转账说明
     * @return bool 是否成功
     */
    public function transfer($fromAccountId, $toAccountId, $amount, $description = '') {
        // 验证参数
        if ($amount <= 0) {
            throw new Exception("转账金额必须大于0");
        }

        if ($fromAccountId == $toAccountId) {
            throw new Exception("不能向自己转账");
        }

        try {
            // 开始事务
            $this->pdo->beginTransaction();

            // 1. 检查转出账户余额
            $checkBalance = $this->pdo->prepare(
                "SELECT balance FROM accounts WHERE id = ? FOR UPDATE"
            );
            $checkBalance->execute([$fromAccountId]);
            $fromAccount = $checkBalance->fetch();

            if (!$fromAccount) {
                throw new Exception("转出账户不存在");
            }

            if ($fromAccount['balance'] < $amount) {
                throw new Exception("余额不足");
            }

            // 2. 检查转入账户
            $checkToAccount = $this->pdo->prepare(
                "SELECT id FROM accounts WHERE id = ?"
            );
            $checkToAccount->execute([$toAccountId]);
            if (!$checkToAccount->fetch()) {
                throw new Exception("转入账户不存在");
            }

            // 3. 从转出账户扣款
            $debitSQL = $this->pdo->prepare(
                "UPDATE accounts SET balance = balance - ? WHERE id = ?"
            );
            $debitSQL->execute([$amount, $fromAccountId]);

            // 4. 向转入账户存款
            $creditSQL = $this->pdo->prepare(
                "UPDATE accounts SET balance = balance + ? WHERE id = ?"
            );
            $creditSQL->execute([$amount, $toAccountId]);

            // 5. 记录转账日志
            $logSQL = $this->pdo->prepare(
                "INSERT INTO transfers (from_account_id, to_account_id, amount, description)
                 VALUES (?, ?, ?, ?)"
            );
            $logSQL->execute([$fromAccountId, $toAccountId, $amount, $description]);

            // 提交事务
            $this->pdo->commit();

            return true;

        } catch (Exception $e) {
            // 回滚事务
            $this->pdo->rollback();
            throw $e;
        }
    }

    /**
     * 查询账户余额
     */
    public function getBalance($accountId) {
        $sql = "SELECT id, name, balance FROM accounts WHERE id = ?";
        $stmt = $this->pdo->prepare($sql);
        $stmt->execute([$accountId]);
        return $stmt->fetch();
    }

    /**
     * 查询转账记录
     */
    public function getTransfers($accountId, $limit = 10) {
        $sql = "
            SELECT t.*,
                   f.name as from_account_name,
                   t.name as to_account_name
            FROM transfers t
            JOIN accounts f ON t.from_account_id = f.id
            JOIN accounts t ON t.to_account_id = t.id
            WHERE t.from_account_id = ? OR t.to_account_id = ?
            ORDER BY t.created_at DESC
            LIMIT ?
        ";
        $stmt = $this->pdo->prepare($sql);
        $stmt->execute([$accountId, $accountId, $limit]);
        return $stmt->fetchAll();
    }
}

// 使用示例
$bank = new BankService($pdo);

// 转账
try {
    $success = $bank->transfer(1, 2, 200.00, "还款");
    if ($success) {
        echo "转账成功!\n";
    }
} catch (Exception $e) {
    echo "转账失败:" . $e->getMessage() . "\n";
}

// 查询余额
$account1 = $bank->getBalance(1);
$account2 = $bank->getBalance(2);

echo "账户1余额:" . $account1['balance'] . "\n";
echo "账户2余额:" . $account2['balance'] . "\n";
?>

复杂事务示例:订单处理系统

创建数据表

<?php
// 商品表
$pdo->exec("
    CREATE TABLE IF NOT EXISTS products (
        id INT AUTO_INCREMENT PRIMARY KEY,
        name VARCHAR(100) NOT NULL,
        price DECIMAL(10, 2) NOT NULL,
        stock INT NOT NULL DEFAULT 0,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
");

// 订单表
$pdo->exec("
    CREATE TABLE IF NOT EXISTS orders (
        id INT AUTO_INCREMENT PRIMARY KEY,
        user_id INT NOT NULL,
        total_amount DECIMAL(10, 2) NOT NULL,
        status ENUM('pending', 'paid', 'shipped', 'delivered', 'cancelled') DEFAULT 'pending',
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
    )
");

// 订单详情表
$pdo->exec("
    CREATE TABLE IF NOT EXISTS order_items (
        id INT AUTO_INCREMENT PRIMARY KEY,
        order_id INT NOT NULL,
        product_id INT NOT NULL,
        quantity INT NOT NULL,
        price DECIMAL(10, 2) NOT NULL,
        FOREIGN KEY (order_id) REFERENCES orders(id),
        FOREIGN KEY (product_id) REFERENCES products(id)
    )
");

// 插入测试商品
$pdo->exec("INSERT INTO products (name, price, stock) VALUES
    ('iPhone 15', 6999.00, 100),
    ('MacBook Pro', 12999.00, 50),
    ('AirPods Pro', 1999.00, 200)");
?>

订单处理类

<?php
class OrderService {
    private $pdo;

    public function __construct($pdo) {
        $this->pdo = $pdo;
    }

    /**
     * 创建订单
     */
    public function createOrder($userId, $items) {
        try {
            $this->pdo->beginTransaction();

            // 1. 验证库存并计算总价
            $totalAmount = 0;
            $validatedItems = [];

            foreach ($items as $item) {
                // 检查商品是否存在
                $productSQL = $this->pdo->prepare(
                    "SELECT id, name, price, stock FROM products WHERE id = ? FOR UPDATE"
                );
                $productSQL->execute([$item['product_id']]);
                $product = $productSQL->fetch();

                if (!$product) {
                    throw new Exception("商品ID {$item['product_id']} 不存在");
                }

                if ($product['stock'] < $item['quantity']) {
                    throw new Exception("商品 {$product['name']} 库存不足");
                }

                // 计算小计
                $subtotal = $product['price'] * $item['quantity'];
                $totalAmount += $subtotal;

                $validatedItems[] = [
                    'product' => $product,
                    'quantity' => $item['quantity'],
                    'subtotal' => $subtotal
                ];
            }

            // 2. 创建订单
            $orderSQL = $this->pdo->prepare(
                "INSERT INTO orders (user_id, total_amount, status) VALUES (?, ?, 'pending')"
            );
            $orderSQL->execute([$userId, $totalAmount]);
            $orderId = $this->pdo->lastInsertId();

            // 3. 创建订单详情并扣减库存
            $itemSQL = $this->pdo->prepare(
                "INSERT INTO order_items (order_id, product_id, quantity, price) VALUES (?, ?, ?, ?)"
            );

            $stockSQL = $this->pdo->prepare(
                "UPDATE products SET stock = stock - ? WHERE id = ?"
            );

            foreach ($validatedItems as $item) {
                // 插入订单详情
                $itemSQL->execute([
                    $orderId,
                    $item['product']['id'],
                    $item['quantity'],
                    $item['product']['price']
                ]);

                // 扣减库存
                $stockSQL->execute([
                    $item['quantity'],
                    $item['product']['id']
                ]);
            }

            $this->pdo->commit();

            return [
                'order_id' => $orderId,
                'total_amount' => $totalAmount,
                'items' => $validatedItems
            ];

        } catch (Exception $e) {
            $this->pdo->rollback();
            throw $e;
        }
    }

    /**
     * 取消订单(恢复库存)
     */
    public function cancelOrder($orderId) {
        try {
            $this->pdo->beginTransaction();

            // 1. 检查订单状态
            $orderSQL = $this->pdo->prepare(
                "SELECT status FROM orders WHERE id = ? FOR UPDATE"
            );
            $orderSQL->execute([$orderId]);
            $order = $orderSQL->fetch();

            if (!$order) {
                throw new Exception("订单不存在");
            }

            if ($order['status'] !== 'pending') {
                throw new Exception("只能取消待处理的订单");
            }

            // 2. 获取订单商品并恢复库存
            $itemsSQL = $this->pdo->prepare(
                "SELECT product_id, quantity FROM order_items WHERE order_id = ?"
            );
            $itemsSQL->execute([$orderId]);
            $items = $itemsSQL->fetchAll();

            $restoreSQL = $this->pdo->prepare(
                "UPDATE products SET stock = stock + ? WHERE id = ?"
            );

            foreach ($items as $item) {
                $restoreSQL->execute([$item['quantity'], $item['product_id']]);
            }

            // 3. 更新订单状态
            $updateSQL = $this->pdo->prepare(
                "UPDATE orders SET status = 'cancelled' WHERE id = ?"
            );
            $updateSQL->execute([$orderId]);

            $this->pdo->commit();

            return true;

        } catch (Exception $e) {
            $this->pdo->rollback();
            throw $e;
        }
    }
}

// 使用示例
$orderService = new OrderService($pdo);

try {
    // 创建订单
    $order = $orderService->createOrder(1, [
        ['product_id' => 1, 'quantity' => 2],  // 2个iPhone 15
        ['product_id' => 3, 'quantity' => 1]   // 1个AirPods Pro
    ]);

    echo "订单创建成功!订单ID: {$order['order_id']}\n";
    echo "订单总金额: ¥{$order['total_amount']}\n";

} catch (Exception $e) {
    echo "创建订单失败:" . $e->getMessage() . "\n";
}
?>

事务隔离级别

设置隔离级别

<?php
// 事务隔离级别(从低到高)
$levels = [
    'READ UNCOMMITTED',  // 读未提交(脏读)
    'READ COMMITTED',    // 读已提交
    'REPEATABLE READ',   // 可重复读(MySQL默认)
    'SERIALIZABLE'       // 串行化
];

// 设置隔离级别
$pdo->exec("SET TRANSACTION ISOLATION LEVEL READ COMMITTED");
$pdo->beginTransaction();

// 执行事务操作
$pdo->commit();
?>

不同隔离级别的影响

<?php
// 场景:两个事务同时操作同一数据

// 事务1:更新商品价格
function updatePrice($pdo, $productId, $newPrice) {
    $pdo->exec("SET TRANSACTION ISOLATION LEVEL READ COMMITTED");
    $pdo->beginTransaction();

    try {
        // 读取当前价格
        $stmt = $pdo->prepare("SELECT price FROM products WHERE id = ?");
        $stmt->execute([$productId]);
        $oldPrice = $stmt->fetch()['price'];

        // 模拟耗时操作
        sleep(2);

        // 更新价格
        $update = $pdo->prepare("UPDATE products SET price = ? WHERE id = ?");
        $update->execute([$newPrice, $productId]);

        echo "商品价格从 $oldPrice 更新到 $newPrice\n";
        $pdo->commit();

    } catch (Exception $e) {
        $pdo->rollback();
        echo "更新失败:" . $e->getMessage() . "\n";
    }
}

// 事务2:查询商品信息
function getProductInfo($pdo, $productId) {
    // 不开启事务,使用默认隔离级别
    $stmt = $pdo->prepare("SELECT * FROM products WHERE id = ?");
    $stmt->execute([$productId]);
    return $stmt->fetch();
}
?>

事务最佳实践

1. 保持事务简短

<?php
// ✅ 好的做法:事务中只包含必要的数据库操作
function transferMoney($pdo, $from, $to, $amount) {
    $pdo->beginTransaction();
    try {
        // 核心数据库操作
        $pdo->prepare("UPDATE accounts SET balance = balance - ? WHERE id = ?")
           ->execute([$amount, $from]);
        $pdo->prepare("UPDATE accounts SET balance = balance + ? WHERE id = ?")
           ->execute([$amount, $to]);

        $pdo->commit();
    } catch (Exception $e) {
        $pdo->rollback();
        throw $e;
    }
}

// ❌ 避免:在事务中执行耗时操作
function transferMoneyWithDelay($pdo, $from, $to, $amount) {
    $pdo->beginTransaction();
    try {
        $pdo->prepare("UPDATE accounts SET balance = balance - ? WHERE id = ?")
           ->execute([$amount, $from]);

        // 不要在事务中执行耗时操作
        sleep(10);  // 不好的做法
        sendEmail(); // 不好的做法

        $pdo->prepare("UPDATE accounts SET balance = balance + ? WHERE id = ?")
           ->execute([$amount, $to]);

        $pdo->commit();
    } catch (Exception $e) {
        $pdo->rollback();
        throw $e;
    }
}
?>

2. 合理使用索引和锁

<?php
// ✅ 使用索引优化查询
function updateUserBalance($pdo, $userId, $amount) {
    $pdo->beginTransaction();
    try {
        // 使用主键索引,快速锁定记录
        $sql = "SELECT balance FROM users WHERE id = ? FOR UPDATE";
        $stmt = $pdo->prepare($sql);
        $stmt->execute([$userId]);

        // 更新余额
        $update = $pdo->prepare("UPDATE users SET balance = ? WHERE id = ?");
        $update->execute([$amount, $userId]);

        $pdo->commit();
    } catch (Exception $e) {
        $pdo->rollback();
        throw $e;
    }
}

// ❌ 避免全表扫描
function updateUsersByCondition($pdo, $condition) {
    $pdo->beginTransaction();
    try {
        // 没有使用索引,可能锁定大量记录
        $sql = "UPDATE users SET status = 'active' WHERE $condition";
        $pdo->exec($sql);

        $pdo->commit();
    } catch (Exception $e) {
        $pdo->rollback();
        throw $e;
    }
}
?>

3. 错误处理和重试机制

<?php
class TransactionManager {
    private $pdo;
    private $maxRetries = 3;

    public function __construct($pdo) {
        $this->pdo = $pdo;
    }

    /**
     * 执行事务(带重试机制)
     */
    public function executeTransaction(callable $callback) {
        $retries = 0;

        while ($retries < $this->maxRetries) {
            try {
                $this->pdo->beginTransaction();

                $result = $callback($this->pdo);

                $this->pdo->commit();
                return $result;

            } catch (PDOException $e) {
                $this->pdo->rollback();

                // 死锁错误,可以重试
                if ($e->errorInfo[1] == 1213 && $retries < $this->maxRetries - 1) {
                    $retries++;
                    $waitTime = rand(100, 500) * 1000; // 100-500ms
                    usleep($waitTime);
                    continue;
                }

                throw $e;
            }
        }

        throw new Exception("事务执行失败,已重试{$this->maxRetries}次");
    }
}

// 使用示例
$txManager = new TransactionManager($pdo);

try {
    $result = $txManager->executeTransaction(function($pdo) {
        $pdo->exec("UPDATE accounts SET balance = balance - 100 WHERE id = 1");
        $pdo->exec("UPDATE accounts SET balance = balance + 100 WHERE id = 2");
        return true;
    });

    echo "转账成功!\n";
} catch (Exception $e) {
    echo "操作失败:" . $e->getMessage() . "\n";
}
?>

4. 嵌套事务(模拟)

<?php
class NestedTransaction {
    private $pdo;
    private $level = 0;

    public function __construct($pdo) {
        $this->pdo = $pdo;
    }

    public function beginTransaction() {
        if ($this->level == 0) {
            $this->pdo->beginTransaction();
        }
        $this->level++;
    }

    public function commit() {
        $this->level--;
        if ($this->level == 0) {
            $this->pdo->commit();
        }
    }

    public function rollback() {
        if ($this->level > 0) {
            $this->pdo->rollback();
            $this->level = 0;
        }
    }

    public function execute(callable $callback) {
        $this->beginTransaction();
        try {
            $result = $callback($this);
            $this->commit();
            return $result;
        } catch (Exception $e) {
            $this->rollback();
            throw $e;
        }
    }
}

// 使用示例
$nestedTx = new NestedTransaction($pdo);

// 外层事务
$nestedTx->execute(function($tx) use ($pdo) {
    echo "开始外层事务\n";

    // 内层事务1
    $tx->execute(function($tx) use ($pdo) {
        echo "执行内层事务1\n";
        $pdo->exec("UPDATE accounts SET balance = balance - 100 WHERE id = 1");
    });

    // 内层事务2
    $tx->execute(function($tx) use ($pdo) {
        echo "执行内层事务2\n";
        $pdo->exec("UPDATE accounts SET balance = balance + 100 WHERE id = 2");
    });

    echo "完成外层事务\n");
});
?>

事务监控和调试

记录事务日志

<?php
class TransactionLogger {
    private $pdo;
    private $logFile;

    public function __construct($pdo, $logFile = 'transaction.log') {
        $this->pdo = $pdo;
        $this->logFile = $logFile;
    }

    public function executeWithLogging(callable $callback, $description = '') {
        $startTime = microtime(true);
        $transactionId = uniqid('tx_');

        $this->log("START: $description", $transactionId);

        try {
            $this->pdo->beginTransaction();

            // 执行前记录
            $this->log("BEGIN: {$description}", $transactionId);

            $result = $callback($this->pdo);

            // 提交前记录
            $this->log("COMMIT: {$description}", $transactionId);

            $this->pdo->commit();

            // 记录执行时间
            $duration = round((microtime(true) - $startTime) * 1000, 2);
            $this->log("SUCCESS: {$description} ({$duration}ms)", $transactionId);

            return $result;

        } catch (Exception $e) {
            $this->pdo->rollback();
            $duration = round((microtime(true) - $startTime) * 1000, 2);
            $this->log("ERROR: {$description} - {$e->getMessage()} ({$duration}ms)", $transactionId);
            throw $e;
        }
    }

    private function log($message, $transactionId) {
        $timestamp = date('Y-m-d H:i:s');
        $logMessage = "[$timestamp] [$transactionId] $message\n";
        file_put_contents($this->logFile, $logMessage, FILE_APPEND);
    }
}

// 使用示例
$logger = new TransactionLogger($pdo);

$logger->executeWithLogging(function($pdo) {
    $pdo->exec("UPDATE accounts SET balance = balance - 100 WHERE id = 1");
    $pdo->exec("UPDATE accounts SET balance = balance + 100 WHERE id = 2");
    return true;
}, "转账操作");
?>

通过本节的学习,你应该掌握了事务处理的基本概念和实际应用。事务是保证数据一致性的重要机制,在处理关键业务逻辑时一定要合理使用事务来确保数据的完整性和一致性。