them tool CRUD Postgres

parent cf2f40ee
......@@ -4,4 +4,11 @@ QDRANT_HOST=localhost #192.168.0.29
QDRANT_PORT=7333
# Cấu hình cho MongoDB
MONGO_URI=mongodb://localhost:27017/
MONGO_DB_NAME=meu_rca
\ No newline at end of file
MONGO_DB_NAME=meu_rca
# Cấu hình cho PostgreSQL
PG_HOST=localhost
PG_PORT=5432
PG_USER=postgres
PG_PASSWORD=your_postgres_password_here
PG_DB_NAME=meu_postgres
\ No newline at end of file
......@@ -5,6 +5,9 @@ from datetime import datetime
import argparse
import json
import ast
import re # Dùng để kiểm tra Regex (bảo mật tên bảng)
import psycopg2
from psycopg2.extras import RealDictCursor # Giúp trả về dữ liệu dạng JSON/Dict thay vì Tuple
from bson.objectid import ObjectId
from pymongo import MongoClient
......@@ -48,6 +51,26 @@ if MONGO_URI:
except Exception as e:
print(f"Lỗi kết nối MongoDB: {e}")
# --- Config PostgreSQL ---
PG_HOST = os.getenv("PG_HOST", "localhost")
PG_PORT = os.getenv("PG_PORT", "5432")
PG_USER = os.getenv("PG_USER", "postgres")
PG_PASSWORD = os.getenv("PG_PASSWORD", "12345") # Đổi lại theo pass máy bạn
PG_DB_NAME = os.getenv("PG_DB_NAME", "meu_postgres")
pg_conn = None
try:
pg_conn = psycopg2.connect(
host=PG_HOST,
port=PG_PORT,
user=PG_USER,
password=PG_PASSWORD,
dbname=PG_DB_NAME
)
pg_conn.autocommit = True # Tự động lưu thay đổi (rất quan trọng)
except Exception as e:
pass
# MCP server
mcp = FastMCP(name="multi-project-memory")
......@@ -333,6 +356,142 @@ def delete_mongo_document(project_id: str, collection_name: str, document_id: st
return "Đã xóa (mềm) thành công document."
except Exception as e:
return f"Lỗi khi xóa: {str(e)}"
# ==========================================
# CÁC TOOL THAO TÁC VỚI POSTGRESQL (AN TOÀN)
# ==========================================
def validate_table_name(table_name: str) -> bool:
"""BẢO MẬT: Chống SQL Injection qua tên bảng. Chỉ cho phép chữ cái, số và gạch dưới."""
return bool(re.match(r'^[a-zA-Z0-9_]+$', table_name))
@mcp.tool()
def select_postgres_records(project_id: str, table_name: str, query: str = "{}", limit: int = 5) -> str:
"""Lấy dữ liệu từ PostgreSQL. Tự động filter theo project_id và bỏ qua is_deleted."""
if pg_conn is None: return "Lỗi: PostgreSQL chưa kết nối."
if not validate_table_name(table_name): return "Lỗi: Tên bảng không hợp lệ."
try:
query_dict = parse_json_safe(query)
# Ép buộc filter bảo mật
query_dict["project_id"] = project_id
query_dict["is_deleted"] = False
# Build câu lệnh SQL an toàn với tham số
where_clauses = []
values = []
for key, value in query_dict.items():
if not validate_table_name(key): continue # Bỏ qua tên cột nguy hiểm
where_clauses.append(f"{key} = %s")
values.append(value)
where_sql = " AND ".join(where_clauses)
safe_limit = min(max(1, limit), 20)
sql = f"SELECT * FROM {table_name} WHERE {where_sql} LIMIT {safe_limit}"
with pg_conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(sql, tuple(values))
results = cursor.fetchall()
# Xử lý datetime để convert sang chuỗi JSON được
for row in results:
for k, v in row.items():
if isinstance(v, datetime): row[k] = v.isoformat()
return json.dumps(results, indent=2, ensure_ascii=False)
except Exception as e:
pg_conn.rollback()
return f"Lỗi truy vấn Postgres: {str(e)}"
@mcp.tool()
def insert_postgres_record(project_id: str, table_name: str, data: str) -> str:
"""Thêm record mới vào PostgreSQL. Tự động gán project_id và timestamp."""
if pg_conn is None: return "Lỗi: PostgreSQL chưa kết nối."
if not validate_table_name(table_name): return "Lỗi: Tên bảng không hợp lệ."
try:
doc_dict = parse_json_safe(data)
if not isinstance(doc_dict, dict): return "Lỗi: Data phải là JSON object."
# Ép buộc metadata
doc_dict["project_id"] = project_id
doc_dict["created_at"] = datetime.now()
doc_dict["updated_at"] = doc_dict["created_at"]
doc_dict["is_deleted"] = False
columns = []
values = []
placeholders = []
for key, value in doc_dict.items():
if not validate_table_name(key): continue
columns.append(key)
values.append(value)
placeholders.append("%s")
sql = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({', '.join(placeholders)}) RETURNING id"
with pg_conn.cursor() as cursor:
cursor.execute(sql, tuple(values))
inserted_id = cursor.fetchone()[0]
return f"Thêm thành công! Inserted ID: {inserted_id}"
except Exception as e:
pg_conn.rollback()
return f"Lỗi insert Postgres: {str(e)}"
@mcp.tool()
def update_postgres_record(project_id: str, table_name: str, record_id: str, update_data: str) -> str:
"""Cập nhật record PostgreSQL. Yêu cầu record_id (khóa chính)."""
if pg_conn is None: return "Lỗi: PostgreSQL chưa kết nối."
if not validate_table_name(table_name): return "Lỗi: Tên bảng không hợp lệ."
try:
update_dict = parse_json_safe(update_data)
update_dict.pop("project_id", None)
update_dict.pop("id", None)
update_dict["updated_at"] = datetime.now()
set_clauses = []
values = []
for key, value in update_dict.items():
if not validate_table_name(key): continue
set_clauses.append(f"{key} = %s")
values.append(value)
if not set_clauses: return "Lỗi: Không có dữ liệu hợp lệ để cập nhật."
# Thêm thông tin cho WHERE clause
values.extend([record_id, project_id])
sql = f"UPDATE {table_name} SET {', '.join(set_clauses)} WHERE id = %s AND project_id = %s AND is_deleted = False"
with pg_conn.cursor() as cursor:
cursor.execute(sql, tuple(values))
if cursor.rowcount == 0:
return "Không tìm thấy record hoặc không có quyền sửa."
return "Cập nhật thành công!"
except Exception as e:
pg_conn.rollback()
return f"Lỗi update Postgres: {str(e)}"
@mcp.tool()
def delete_postgres_record(project_id: str, table_name: str, record_id: str) -> str:
"""Xóa mềm (Soft Delete) record PostgreSQL."""
if pg_conn is None: return "Lỗi: PostgreSQL chưa kết nối."
if not validate_table_name(table_name): return "Lỗi: Tên bảng không hợp lệ."
try:
sql = f"UPDATE {table_name} SET is_deleted = True, updated_at = %s WHERE id = %s AND project_id = %s"
with pg_conn.cursor() as cursor:
cursor.execute(sql, (datetime.now(), record_id, project_id))
if cursor.rowcount == 0:
return "Không tìm thấy record hợp lệ để xóa."
return "Đã xóa (mềm) thành công record."
except Exception as e:
pg_conn.rollback()
return f"Lỗi delete Postgres: {str(e)}"
if __name__ == "__main__":
mcp.run()
\ No newline at end of file
import os
import psycopg2
from datetime import datetime
from dotenv import load_dotenv
# Load cấu hình từ file .env
load_dotenv()
PG_HOST = os.getenv("PG_HOST", "localhost")
PG_PORT = os.getenv("PG_PORT", "5432")
PG_USER = os.getenv("PG_USER", "postgres")
PG_PASSWORD = os.getenv("PG_PASSWORD", "12345")
PG_DB_NAME = os.getenv("PG_DB_NAME", "meu_postgres")
# Dữ liệu test trộn lẫn cả 2 project để kiểm tra độ cách ly
MOCK_TASKS = [
# Data của meu-rca-nextjs
("meu-rca-nextjs", "Tối ưu hóa query MongoDB", "Team Backend", "in_progress", False, datetime.now(), datetime.now()),
("meu-rca-nextjs", "Sửa lỗi crash app trên iOS", "Team Mobile", "open", False, datetime.now(), datetime.now()),
# Data của meu-marketing
("meu-marketing", "Lên kịch bản Tiktok tuần 1", "Content Marketing", "done", False, datetime.now(), datetime.now()),
("meu-marketing", "Chạy ads Facebook tháng 4", "Digital Marketing", "open", False, datetime.now(), datetime.now()),
]
def seed_postgres():
print("⏳ Đang kết nối PostgreSQL...")
try:
conn = psycopg2.connect(
host=PG_HOST, port=PG_PORT, user=PG_USER, password=PG_PASSWORD, dbname=PG_DB_NAME
)
conn.autocommit = True
cursor = conn.cursor()
# 1. TẠO BẢNG (Schema) - Bắt buộc trong SQL
print("📦 Đang kiểm tra và khởi tạo bảng 'tasks'...")
cursor.execute("""
CREATE TABLE IF NOT EXISTS tasks (
id SERIAL PRIMARY KEY,
project_id VARCHAR(50) NOT NULL,
title VARCHAR(255) NOT NULL,
team VARCHAR(100),
status VARCHAR(50),
is_deleted BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP,
updated_at TIMESTAMP
)
""")
# 2. Xóa data cũ (Làm sạch bảng trước khi seed)
cursor.execute("TRUNCATE TABLE tasks RESTART IDENTITY;")
print("🧹 Đã làm sạch dữ liệu cũ.")
# 3. CHÈN DATA MỚI
insert_query = """
INSERT INTO tasks (project_id, title, team, status, is_deleted, created_at, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
cursor.executemany(insert_query, MOCK_TASKS)
print(f"✅ Đã cấy thành công {len(MOCK_TASKS)} task mẫu vào bảng 'tasks'!")
print("🚀 Hệ thống PostgreSQL đã sẵn sàng cho AI Agent!")
cursor.close()
conn.close()
except Exception as e:
print(f"❌ Lỗi khi cấy dữ liệu PostgreSQL: {e}")
print("💡 Gợi ý: Hãy chắc chắn bạn đã tạo database tên là 'meu_postgres' trong pgAdmin/DBeaver trước khi chạy file này.")
if __name__ == "__main__":
seed_postgres()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment