代码高亮测试
测试各种编程语言的代码高亮效果,包括 Python、JavaScript、C++、CUDA 等
• LLM Engineer
测试 代码高亮 编程语言
代码高亮测试
本文测试各种编程语言的代码高亮效果。
Python 代码
基础 Python 代码
import numpy as np
import torch
import torch.nn as nn
from transformers import AutoTokenizer, AutoModel
class TransformerModel(nn.Module):
def __init__(self, model_name: str, num_classes: int):
super().__init__()
self.backbone = AutoModel.from_pretrained(model_name)
self.classifier = nn.Linear(self.backbone.config.hidden_size, num_classes)
def forward(self, input_ids, attention_mask=None):
outputs = self.backbone(input_ids, attention_mask=attention_mask)
pooled_output = outputs.pooler_output
return self.classifier(pooled_output)
# 初始化模型
model = TransformerModel("bert-base-uncased", num_classes=2)
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
# 示例训练循环
def train_epoch(model, dataloader, optimizer, criterion):
model.train()
total_loss = 0
for batch_idx, (texts, labels) in enumerate(dataloader):
# 编码文本
encoded = tokenizer(
texts,
padding=True,
truncation=True,
return_tensors="pt",
max_length=512
)
# 前向传播
outputs = model(encoded['input_ids'], encoded['attention_mask'])
loss = criterion(outputs, labels)
# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
if batch_idx % 100 == 0:
print(f'Batch {batch_idx}, Loss: {loss.item():.4f}')
return total_loss / len(dataloader)
PPO 算法实现
import torch
import torch.nn.functional as F
from torch.distributions import Categorical
class PPOAgent:
def __init__(self, policy_net, value_net, lr=3e-4, eps_clip=0.2):
self.policy_net = policy_net
self.value_net = value_net
self.optimizer = torch.optim.Adam(
list(policy_net.parameters()) + list(value_net.parameters()),
lr=lr
)
self.eps_clip = eps_clip
def compute_advantages(self, rewards, values, gamma=0.99, lambda_=0.95):
"""计算 GAE 优势函数"""
advantages = []
gae = 0
for i in reversed(range(len(rewards))):
delta = rewards[i] + gamma * values[i+1] - values[i]
gae = delta + gamma * lambda_ * gae
advantages.insert(0, gae)
return torch.tensor(advantages, dtype=torch.float32)
def update(self, states, actions, old_log_probs, rewards, values):
"""PPO 更新"""
advantages = self.compute_advantages(rewards, values)
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
for _ in range(4): # PPO epochs
# 新的策略输出
logits = self.policy_net(states)
dist = Categorical(F.softmax(logits, dim=-1))
new_log_probs = dist.log_prob(actions)
# 重要性采样比率
ratio = torch.exp(new_log_probs - old_log_probs)
# PPO 剪切损失
surr1 = ratio * advantages
surr2 = torch.clamp(ratio, 1 - self.eps_clip, 1 + self.eps_clip) * advantages
policy_loss = -torch.min(surr1, surr2).mean()
# 价值函数损失
new_values = self.value_net(states).squeeze()
value_loss = F.mse_loss(new_values, rewards + advantages)
# 总损失
total_loss = policy_loss + 0.5 * value_loss
self.optimizer.zero_grad()
total_loss.backward()
self.optimizer.step()
JavaScript/TypeScript 代码
React 组件
import React, { useState, useEffect, useCallback } from 'react';
import { debounce } from 'lodash';
interface SearchProps {
onSearch: (query: string) => Promise<SearchResult[]>;
placeholder?: string;
delay?: number;
}
interface SearchResult {
id: string;
title: string;
description: string;
url: string;
}
const SearchComponent: React.FC<SearchProps> = ({
onSearch,
placeholder = "搜索...",
delay = 300
}) => {
const [query, setQuery] = useState<string>('');
const [results, setResults] = useState<SearchResult[]>([]);
const [loading, setLoading] = useState<boolean>(false);
const [error, setError] = useState<string | null>(null);
// 防抖搜索函数
const debouncedSearch = useCallback(
debounce(async (searchQuery: string) => {
if (!searchQuery.trim()) {
setResults([]);
return;
}
setLoading(true);
setError(null);
try {
const searchResults = await onSearch(searchQuery);
setResults(searchResults);
} catch (err) {
setError(err instanceof Error ? err.message : '搜索失败');
setResults([]);
} finally {
setLoading(false);
}
}, delay),
[onSearch, delay]
);
useEffect(() => {
debouncedSearch(query);
return () => debouncedSearch.cancel();
}, [query, debouncedSearch]);
const handleInputChange = (e: React.ChangeEvent<HTMLInputElement>) => {
setQuery(e.target.value);
};
return (
<div className="search-container">
<div className="search-input-wrapper">
<input
type="text"
value={query}
onChange={handleInputChange}
placeholder={placeholder}
className="search-input"
/>
{loading && <div className="search-spinner">🔄</div>}
</div>
{error && (
<div className="search-error">
错误: {error}
</div>
)}
<div className="search-results">
{results.map((result) => (
<div key={result.id} className="search-result-item">
<h3>{result.title}</h3>
<p>{result.description}</p>
<a href={result.url} target="_blank" rel="noopener noreferrer">
查看详情
</a>
</div>
))}
</div>
</div>
);
};
export default SearchComponent;
Node.js API
const express = require('express');
const { body, validationResult } = require('express-validator');
const rateLimit = require('express-rate-limit');
const helmet = require('helmet');
const app = express();
// 安全中间件
app.use(helmet());
app.use(express.json({ limit: '10mb' }));
// 限流中间件
const apiLimiter = rateLimit({
windowMs: 15 * 60 * 1000, // 15分钟
max: 100, // 最多100次请求
message: '请求过于频繁,请稍后再试'
});
app.use('/api/', apiLimiter);
// 模型推理 API
app.post('/api/inference',
[
body('text').isString().isLength({ min: 1, max: 1000 }),
body('model').optional().isString(),
body('temperature').optional().isFloat({ min: 0, max: 2 })
],
async (req, res) => {
const errors = validationResult(req);
if (!errors.isEmpty()) {
return res.status(400).json({ errors: errors.array() });
}
const { text, model = 'gpt-3.5-turbo', temperature = 0.7 } = req.body;
try {
// 模拟 LLM 推理
const response = await callLLMAPI({
prompt: text,
model: model,
temperature: temperature,
max_tokens: 500
});
res.json({
success: true,
data: {
text: response.text,
usage: response.usage,
model: model
}
});
} catch (error) {
console.error('推理错误:', error);
res.status(500).json({
success: false,
error: '模型推理失败'
});
}
}
);
async function callLLMAPI(params) {
// 这里是实际的 LLM API 调用逻辑
return new Promise((resolve) => {
setTimeout(() => {
resolve({
text: `AI回复: ${params.prompt}`,
usage: { prompt_tokens: 10, completion_tokens: 20 }
});
}, 1000);
});
}
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`服务器运行在端口 ${PORT}`);
});
C++ 代码
CUDA 矩阵乘法
#include <cuda_runtime.h>
#include <cublas_v2.h>
#include <iostream>
#include <vector>
#include <chrono>
// CUDA 核函数:朴素矩阵乘法
__global__ void matmul_naive(const float* A, const float* B, float* C,
int M, int N, int K) {
int row = blockIdx.y * blockDim.y + threadIdx.y;
int col = blockIdx.x * blockDim.x + threadIdx.x;
if (row < M && col < N) {
float sum = 0.0f;
for (int k = 0; k < K; ++k) {
sum += A[row * K + k] * B[k * N + col];
}
C[row * N + col] = sum;
}
}
// 使用共享内存的优化版本
__global__ void matmul_shared(const float* A, const float* B, float* C,
int M, int N, int K) {
const int TILE_SIZE = 16;
__shared__ float As[TILE_SIZE][TILE_SIZE];
__shared__ float Bs[TILE_SIZE][TILE_SIZE];
int row = blockIdx.y * TILE_SIZE + threadIdx.y;
int col = blockIdx.x * TILE_SIZE + threadIdx.x;
int tx = threadIdx.x;
int ty = threadIdx.y;
float sum = 0.0f;
for (int tile = 0; tile < (K + TILE_SIZE - 1) / TILE_SIZE; ++tile) {
// 加载数据到共享内存
int a_col = tile * TILE_SIZE + tx;
int b_row = tile * TILE_SIZE + ty;
As[ty][tx] = (row < M && a_col < K) ? A[row * K + a_col] : 0.0f;
Bs[ty][tx] = (b_row < K && col < N) ? B[b_row * N + col] : 0.0f;
__syncthreads();
// 计算部分乘积
for (int k = 0; k < TILE_SIZE; ++k) {
sum += As[ty][k] * Bs[k][tx];
}
__syncthreads();
}
if (row < M && col < N) {
C[row * N + col] = sum;
}
}
class GPUMatMul {
private:
cublasHandle_t cublas_handle;
float *d_A, *d_B, *d_C;
int M, N, K;
public:
GPUMatMul(int m, int n, int k) : M(m), N(n), K(k) {
// 初始化 cuBLAS
cublasCreate(&cublas_handle);
// 分配 GPU 内存
cudaMalloc(&d_A, M * K * sizeof(float));
cudaMalloc(&d_B, K * N * sizeof(float));
cudaMalloc(&d_C, M * N * sizeof(float));
// 检查 CUDA 错误
checkCudaError();
}
~GPUMatMul() {
cudaFree(d_A);
cudaFree(d_B);
cudaFree(d_C);
cublasDestroy(cublas_handle);
}
void compute_cublas(const std::vector<float>& A, const std::vector<float>& B,
std::vector<float>& C) {
// 复制数据到 GPU
cudaMemcpy(d_A, A.data(), M * K * sizeof(float), cudaMemcpyHostToDevice);
cudaMemcpy(d_B, B.data(), K * N * sizeof(float), cudaMemcpyHostToDevice);
const float alpha = 1.0f, beta = 0.0f;
// cuBLAS 矩阵乘法:C = alpha * A * B + beta * C
cublasSgemm(cublas_handle, CUBLAS_OP_N, CUBLAS_OP_N,
N, M, K,
&alpha,
d_B, N,
d_A, K,
&beta,
d_C, N);
// 复制结果回 CPU
cudaMemcpy(C.data(), d_C, M * N * sizeof(float), cudaMemcpyDeviceToHost);
checkCudaError();
}
void compute_custom(const std::vector<float>& A, const std::vector<float>& B,
std::vector<float>& C) {
// 复制数据到 GPU
cudaMemcpy(d_A, A.data(), M * K * sizeof(float), cudaMemcpyHostToDevice);
cudaMemcpy(d_B, B.data(), K * N * sizeof(float), cudaMemcpyHostToDevice);
// 启动核函数
dim3 blockSize(16, 16);
dim3 gridSize((N + blockSize.x - 1) / blockSize.x,
(M + blockSize.y - 1) / blockSize.y);
matmul_shared<<<gridSize, blockSize>>>(d_A, d_B, d_C, M, N, K);
// 复制结果回 CPU
cudaMemcpy(C.data(), d_C, M * N * sizeof(float), cudaMemcpyDeviceToHost);
checkCudaError();
}
private:
void checkCudaError() {
cudaError_t error = cudaGetLastError();
if (error != cudaSuccess) {
throw std::runtime_error("CUDA 错误: " + std::string(cudaGetErrorString(error)));
}
}
};
// 性能测试
int main() {
const int M = 1024, N = 1024, K = 1024;
// 初始化数据
std::vector<float> A(M * K), B(K * N), C_cublas(M * N), C_custom(M * N);
// 随机初始化
std::srand(42);
for (int i = 0; i < M * K; ++i) A[i] = static_cast<float>(std::rand()) / RAND_MAX;
for (int i = 0; i < K * N; ++i) B[i] = static_cast<float>(std::rand()) / RAND_MAX;
GPUMatMul gpu_matmul(M, N, K);
// 测试 cuBLAS
auto start = std::chrono::high_resolution_clock::now();
gpu_matmul.compute_cublas(A, B, C_cublas);
auto end = std::chrono::high_resolution_clock::now();
auto cublas_time = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
// 测试自定义实现
start = std::chrono::high_resolution_clock::now();
gpu_matmul.compute_custom(A, B, C_custom);
end = std::chrono::high_resolution_clock::now();
auto custom_time = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
std::cout << "cuBLAS 时间: " << cublas_time.count() << " μs" << std::endl;
std::cout << "自定义实现时间: " << custom_time.count() << " μs" << std::endl;
return 0;
}
Shell 脚本
部署脚本
#!/bin/bash
# 模型部署脚本
set -euo pipefail
# 配置变量
MODEL_NAME="${1:-llama-7b}"
DEPLOYMENT_ENV="${2:-staging}"
VERSION="${3:-latest}"
DOCKER_REGISTRY="your-registry.com"
NAMESPACE="llm-inference"
# 颜色输出
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
log_info() {
echo -e "${GREEN}[INFO]${NC} $1"
}
log_warn() {
echo -e "${YELLOW}[WARN]${NC} $1"
}
log_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
# 检查依赖
check_dependencies() {
log_info "检查依赖..."
commands=("docker" "kubectl" "helm")
for cmd in "${commands[@]}"; do
if ! command -v "$cmd" &> /dev/null; then
log_error "$cmd 未安装"
exit 1
fi
done
log_info "依赖检查完成"
}
# 构建 Docker 镜像
build_image() {
log_info "构建 Docker 镜像..."
IMAGE_TAG="${DOCKER_REGISTRY}/${MODEL_NAME}:${VERSION}"
docker build \
--build-arg MODEL_NAME="${MODEL_NAME}" \
--build-arg VERSION="${VERSION}" \
-t "${IMAGE_TAG}" \
-f docker/Dockerfile .
log_info "镜像构建完成: ${IMAGE_TAG}"
# 推送镜像
docker push "${IMAGE_TAG}"
log_info "镜像推送完成"
}
# 部署到 Kubernetes
deploy_to_k8s() {
log_info "部署到 Kubernetes..."
# 创建命名空间(如果不存在)
kubectl create namespace "${NAMESPACE}" --dry-run=client -o yaml | kubectl apply -f -
# 使用 Helm 部署
helm upgrade --install "${MODEL_NAME}" \
./helm/llm-inference \
--namespace "${NAMESPACE}" \
--set image.repository="${DOCKER_REGISTRY}/${MODEL_NAME}" \
--set image.tag="${VERSION}" \
--set environment="${DEPLOYMENT_ENV}" \
--set model.name="${MODEL_NAME}" \
--timeout 600s
log_info "部署完成"
}
# 健康检查
health_check() {
log_info "执行健康检查..."
SERVICE_URL="http://$(kubectl get svc ${MODEL_NAME} -n ${NAMESPACE} -o jsonpath='{.status.loadBalancer.ingress[0].ip}'):8000"
# 等待服务就绪
for i in {1..30}; do
if curl -f "${SERVICE_URL}/health" &> /dev/null; then
log_info "服务健康检查通过"
break
fi
if [ $i -eq 30 ]; then
log_error "服务健康检查失败"
exit 1
fi
log_warn "等待服务就绪... ($i/30)"
sleep 10
done
# 测试推理接口
log_info "测试推理接口..."
curl -X POST "${SERVICE_URL}/inference" \
-H "Content-Type: application/json" \
-d '{"text": "测试输入", "max_tokens": 50}' \
| jq .
}
# 清理函数
cleanup() {
if [ $? -ne 0 ]; then
log_error "部署失败,执行清理..."
helm uninstall "${MODEL_NAME}" -n "${NAMESPACE}" || true
fi
}
# 主函数
main() {
log_info "开始部署 ${MODEL_NAME} (${VERSION}) 到 ${DEPLOYMENT_ENV} 环境"
trap cleanup EXIT
check_dependencies
build_image
deploy_to_k8s
health_check
log_info "部署成功完成!"
log_info "服务访问地址: ${SERVICE_URL}"
}
# 脚本入口
if [[ "${BASH_SOURCE[0]}" == "${0}" ]]; then
main "$@"
fi
SQL 查询
复杂分析查询
-- 用户行为分析查询
WITH user_sessions AS (
SELECT
user_id,
session_id,
DATE(created_at) as session_date,
COUNT(*) as interaction_count,
AVG(response_time) as avg_response_time,
SUM(CASE WHEN feedback_score >= 4 THEN 1 ELSE 0 END) as positive_feedback
FROM user_interactions
WHERE created_at >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY user_id, session_id, DATE(created_at)
),
daily_metrics AS (
SELECT
session_date,
COUNT(DISTINCT user_id) as daily_active_users,
COUNT(DISTINCT session_id) as total_sessions,
AVG(interaction_count) as avg_interactions_per_session,
AVG(avg_response_time) as avg_response_time,
SUM(positive_feedback) * 100.0 / SUM(interaction_count) as satisfaction_rate
FROM user_sessions
GROUP BY session_date
),
model_performance AS (
SELECT
model_version,
DATE(created_at) as usage_date,
COUNT(*) as total_requests,
AVG(latency_ms) as avg_latency,
AVG(token_count) as avg_tokens,
SUM(CASE WHEN error_code IS NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*) as success_rate
FROM model_requests
WHERE created_at >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY model_version, DATE(created_at)
)
SELECT
dm.session_date,
dm.daily_active_users,
dm.total_sessions,
ROUND(dm.avg_interactions_per_session, 2) as avg_interactions,
ROUND(dm.avg_response_time, 2) as avg_response_time_sec,
ROUND(dm.satisfaction_rate, 2) as satisfaction_rate_pct,
mp.model_version,
mp.total_requests,
ROUND(mp.avg_latency, 2) as avg_latency_ms,
ROUND(mp.success_rate, 2) as success_rate_pct
FROM daily_metrics dm
LEFT JOIN model_performance mp ON dm.session_date = mp.usage_date
ORDER BY dm.session_date DESC, mp.total_requests DESC;
-- 模型A/B测试分析
SELECT
experiment_group,
COUNT(DISTINCT user_id) as unique_users,
COUNT(*) as total_interactions,
AVG(response_quality_score) as avg_quality_score,
AVG(user_satisfaction_score) as avg_satisfaction,
AVG(task_completion_rate) as completion_rate,
STDDEV(response_quality_score) as quality_stddev,
-- 统计显著性检验
(AVG(CASE WHEN experiment_group = 'A' THEN response_quality_score END) -
AVG(CASE WHEN experiment_group = 'B' THEN response_quality_score END)) /
SQRT(VAR_POP(response_quality_score) * (1.0/COUNT(CASE WHEN experiment_group = 'A' THEN 1 END) +
1.0/COUNT(CASE WHEN experiment_group = 'B' THEN 1 END))) as t_statistic
FROM ab_test_results
WHERE experiment_name = 'llm_model_comparison'
AND created_at >= '2024-01-01'
GROUP BY experiment_group;
配置文件
Docker Compose
# docker-compose.yml
version: '3.8'
services:
llm-api:
build:
context: .
dockerfile: Dockerfile
args:
MODEL_SIZE: 7b
ports:
- "8000:8000"
environment:
- MODEL_PATH=/models/llama-7b
- CUDA_VISIBLE_DEVICES=0,1
- BATCH_SIZE=16
- MAX_SEQUENCE_LENGTH=2048
volumes:
- ./models:/models:ro
- ./logs:/app/logs
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 2
capabilities: [gpu]
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes --maxmemory 2gb --maxmemory-policy allkeys-lru
monitoring:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
- prometheus_data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.console.libraries=/etc/prometheus/console_libraries'
- '--web.console.templates=/etc/prometheus/consoles'
volumes:
redis_data:
prometheus_data:
networks:
default:
driver: bridge
代码高亮测试完成!支持多种编程语言和语法高亮效果。