them tool get_postgres_foreign_keys,get_postgres_triggers, get_postgres_function_code

parent 4fca72c8
...@@ -357,7 +357,7 @@ def delete_mongo_document(project_id: str, collection_name: str, document_id: st ...@@ -357,7 +357,7 @@ def delete_mongo_document(project_id: str, collection_name: str, document_id: st
except Exception as e: except Exception as e:
return f"Lỗi khi xóa: {str(e)}" return f"Lỗi khi xóa: {str(e)}"
# ========================================== # ==========================================
# CÁC TOOL THAO TÁC VỚI POSTGRESQL (AN TOÀN) # CÁC TOOL THAO TÁC VỚI POSTGRESQL (TỔNG QUÁT & BẢO MẬT DỮ LIỆU)
# ========================================== # ==========================================
def validate_table_name(table_name: str) -> bool: def validate_table_name(table_name: str) -> bool:
...@@ -366,22 +366,14 @@ def validate_table_name(table_name: str) -> bool: ...@@ -366,22 +366,14 @@ def validate_table_name(table_name: str) -> bool:
@mcp.tool() @mcp.tool()
def list_postgres_tables(task_progress: str = "") -> str: def list_postgres_tables(task_progress: str = "") -> str:
"""Lấy danh sách tất cả các bảng (tables) hiện có trong database PostgreSQL. BẮT BUỘC KHÔNG truyền tham số nào vào tool này.""" """Lấy danh sách tất cả các bảng hiện có trong PostgreSQL."""
if pg_conn is None: return "Lỗi: PostgreSQL chưa kết nối." if pg_conn is None: return "Lỗi: PostgreSQL chưa kết nối."
try: try:
sql = """ sql = "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' AND table_type = 'BASE TABLE';"
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public' AND table_type = 'BASE TABLE';
"""
with pg_conn.cursor(cursor_factory=RealDictCursor) as cursor: with pg_conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(sql) cursor.execute(sql)
results = cursor.fetchall() results = cursor.fetchall()
if not results: return "Database hiện tại chưa có bảng nào."
if not results:
return "Database hiện tại chưa có bảng nào."
return json.dumps([row['table_name'] for row in results], indent=2, ensure_ascii=False) return json.dumps([row['table_name'] for row in results], indent=2, ensure_ascii=False)
except Exception as e: except Exception as e:
pg_conn.rollback() pg_conn.rollback()
...@@ -389,50 +381,35 @@ def list_postgres_tables(task_progress: str = "") -> str: ...@@ -389,50 +381,35 @@ def list_postgres_tables(task_progress: str = "") -> str:
@mcp.tool() @mcp.tool()
def get_postgres_schema(table_name: str) -> str: def get_postgres_schema(table_name: str) -> str:
"""Lấy cấu trúc các cột của một bảng trong PostgreSQL. BẮT BUỘC gọi tool này nếu bạn không chắc chắn về tên các cột trong bảng.""" """Lấy cấu trúc các cột của một bảng trong PostgreSQL."""
if pg_conn is None: return "Lỗi: PostgreSQL chưa kết nối." if pg_conn is None: return "Lỗi: PostgreSQL chưa kết nối."
if not validate_table_name(table_name): return "Lỗi: Tên bảng không hợp lệ." if not validate_table_name(table_name): return "Lỗi: Tên bảng không hợp lệ."
try: try:
# Truy vấn vào bảng hệ thống của Postgres để lấy danh sách cột sql = "SELECT column_name, data_type FROM information_schema.columns WHERE table_name = %s;"
sql = """
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = %s;
"""
with pg_conn.cursor(cursor_factory=RealDictCursor) as cursor: with pg_conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(sql, (table_name,)) cursor.execute(sql, (table_name,))
results = cursor.fetchall() results = cursor.fetchall()
if not results: return f"Bảng '{table_name}' không tồn tại."
if not results:
return f"Bảng '{table_name}' không tồn tại hoặc không có dữ liệu."
return json.dumps(results, indent=2, ensure_ascii=False) return json.dumps(results, indent=2, ensure_ascii=False)
except Exception as e: except Exception as e:
pg_conn.rollback() pg_conn.rollback()
return f"Lỗi lấy cấu trúc bảng: {str(e)}" return f"Lỗi lấy cấu trúc bảng: {str(e)}"
@mcp.tool() @mcp.tool()
def select_postgres_records(project_id: str, table_name: str, query: str = "{}", limit: int = 5) -> str: def select_postgres_records(table_name: str, query: str = "{}", limit: int = 5) -> str:
"""Lấy dữ liệu từ PostgreSQL. Tự động filter theo project_id và bỏ qua is_deleted.""" """Lấy dữ liệu từ PostgreSQL. Tham số query là chuỗi JSON hợp lệ."""
if pg_conn is None: return "Lỗi: PostgreSQL chưa kết nối." if pg_conn is None: return "Lỗi: PostgreSQL chưa kết nối."
if not validate_table_name(table_name): return "Lỗi: Tên bảng không hợp lệ." if not validate_table_name(table_name): return "Lỗi: Tên bảng không hợp lệ."
try: try:
query_dict = parse_json_safe(query) query_dict = parse_json_safe(query)
# Ép buộc filter bảo mật
query_dict["project_id"] = project_id
query_dict["is_deleted"] = False
# Build câu lệnh SQL an toàn với tham số
where_clauses = [] where_clauses = []
values = [] values = []
for key, value in query_dict.items(): for key, value in query_dict.items():
if not validate_table_name(key): continue # Bỏ qua tên cột nguy hiểm if not validate_table_name(key): continue
where_clauses.append(f"{key} = %s") where_clauses.append(f"{key} = %s")
values.append(value) values.append(value)
where_sql = " AND ".join(where_clauses) where_sql = " AND ".join(where_clauses) if where_clauses else "1=1"
safe_limit = min(max(1, limit), 20) safe_limit = min(max(1, limit), 20)
sql = f"SELECT * FROM {table_name} WHERE {where_sql} LIMIT {safe_limit}" sql = f"SELECT * FROM {table_name} WHERE {where_sql} LIMIT {safe_limit}"
...@@ -440,68 +417,57 @@ def select_postgres_records(project_id: str, table_name: str, query: str = "{}", ...@@ -440,68 +417,57 @@ def select_postgres_records(project_id: str, table_name: str, query: str = "{}",
with pg_conn.cursor(cursor_factory=RealDictCursor) as cursor: with pg_conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(sql, tuple(values)) cursor.execute(sql, tuple(values))
results = cursor.fetchall() results = cursor.fetchall()
# Xử lý format ngày tháng và UUID để đưa vào JSON
# Xử lý datetime để convert sang chuỗi JSON được
for row in results: for row in results:
for k, v in row.items(): for k, v in row.items():
if isinstance(v, datetime): row[k] = v.isoformat() if isinstance(v, datetime): row[k] = v.isoformat()
elif hasattr(v, 'hex'): row[k] = str(v)
return json.dumps(results, indent=2, ensure_ascii=False) return json.dumps(results, indent=2, ensure_ascii=False)
except Exception as e: except Exception as e:
pg_conn.rollback() pg_conn.rollback()
return f"Lỗi truy vấn Postgres: {str(e)}" return f"Lỗi truy vấn Postgres: {str(e)}"
@mcp.tool() @mcp.tool()
def insert_postgres_record(project_id: str, table_name: str, data: str) -> str: def insert_postgres_record(table_name: str, data: str) -> str:
"""Thêm record mới vào PostgreSQL. Tự động gán project_id và timestamp.""" """Thêm record mới vào PostgreSQL. Data là JSON string."""
if pg_conn is None: return "Lỗi: PostgreSQL chưa kết nối." if pg_conn is None: return "Lỗi: PostgreSQL chưa kết nối."
if not validate_table_name(table_name): return "Lỗi: Tên bảng không hợp lệ." if not validate_table_name(table_name): return "Lỗi: Tên bảng không hợp lệ."
try: try:
doc_dict = parse_json_safe(data) doc_dict = parse_json_safe(data)
if not isinstance(doc_dict, dict): return "Lỗi: Data phải là JSON object." if not isinstance(doc_dict, dict): return "Lỗi: Data phải là JSON object."
# Ép buộc metadata
doc_dict["project_id"] = project_id
doc_dict["created_at"] = datetime.now()
doc_dict["updated_at"] = doc_dict["created_at"]
doc_dict["is_deleted"] = False
columns = [] columns = []
values = [] values = []
placeholders = [] placeholders = []
for key, value in doc_dict.items(): for key, value in doc_dict.items():
if not validate_table_name(key): continue if not validate_table_name(key): continue
columns.append(key) columns.append(key)
values.append(value) values.append(value)
placeholders.append("%s") placeholders.append("%s")
sql = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({', '.join(placeholders)}) RETURNING id" sql = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({', '.join(placeholders)}) RETURNING *"
with pg_conn.cursor(cursor_factory=RealDictCursor) as cursor:
with pg_conn.cursor() as cursor:
cursor.execute(sql, tuple(values)) cursor.execute(sql, tuple(values))
inserted_id = cursor.fetchone()[0] inserted_row = cursor.fetchone()
return f"Thêm thành công! Inserted ID: {inserted_id}" for k, v in inserted_row.items():
if isinstance(v, datetime): inserted_row[k] = v.isoformat()
elif hasattr(v, 'hex'): inserted_row[k] = str(v)
return f"Thêm thành công! Dữ liệu: {json.dumps(inserted_row, ensure_ascii=False)}"
except Exception as e: except Exception as e:
pg_conn.rollback() pg_conn.rollback()
return f"Lỗi insert Postgres: {str(e)}" return f"Lỗi insert Postgres: {str(e)}"
@mcp.tool() @mcp.tool()
def update_postgres_record(project_id: str, table_name: str, record_id: str, update_data: str) -> str: def update_postgres_record(table_name: str, record_id: str, update_data: str, id_column: str = "id") -> str:
"""Cập nhật record PostgreSQL. Yêu cầu record_id (khóa chính).""" """Cập nhật record PostgreSQL."""
if pg_conn is None: return "Lỗi: PostgreSQL chưa kết nối." if pg_conn is None: return "Lỗi: PostgreSQL chưa kết nối."
if not validate_table_name(table_name): return "Lỗi: Tên bảng không hợp lệ." if not validate_table_name(table_name): return "Lỗi: Tên bảng không hợp lệ."
try: try:
update_dict = parse_json_safe(update_data) update_dict = parse_json_safe(update_data)
update_dict.pop("project_id", None) update_dict.pop(id_column, None) # Chống sửa khóa chính
update_dict.pop("id", None)
update_dict["updated_at"] = datetime.now()
set_clauses = [] set_clauses = []
values = [] values = []
for key, value in update_dict.items(): for key, value in update_dict.items():
if not validate_table_name(key): continue if not validate_table_name(key): continue
set_clauses.append(f"{key} = %s") set_clauses.append(f"{key} = %s")
...@@ -509,37 +475,108 @@ def update_postgres_record(project_id: str, table_name: str, record_id: str, upd ...@@ -509,37 +475,108 @@ def update_postgres_record(project_id: str, table_name: str, record_id: str, upd
if not set_clauses: return "Lỗi: Không có dữ liệu hợp lệ để cập nhật." if not set_clauses: return "Lỗi: Không có dữ liệu hợp lệ để cập nhật."
# Thêm thông tin cho WHERE clause values.append(record_id)
values.extend([record_id, project_id]) sql = f"UPDATE {table_name} SET {', '.join(set_clauses)} WHERE {id_column} = %s"
sql = f"UPDATE {table_name} SET {', '.join(set_clauses)} WHERE id = %s AND project_id = %s AND is_deleted = False"
with pg_conn.cursor() as cursor: with pg_conn.cursor() as cursor:
cursor.execute(sql, tuple(values)) cursor.execute(sql, tuple(values))
if cursor.rowcount == 0: if cursor.rowcount == 0: return "Không tìm thấy record để sửa."
return "Không tìm thấy record hoặc không có quyền sửa."
return "Cập nhật thành công!" return "Cập nhật thành công!"
except Exception as e: except Exception as e:
pg_conn.rollback() pg_conn.rollback()
return f"Lỗi update Postgres: {str(e)}" return f"Lỗi update Postgres: {str(e)}"
@mcp.tool() @mcp.tool()
def delete_postgres_record(project_id: str, table_name: str, record_id: str) -> str: def delete_postgres_record(table_name: str, record_id: str, id_column: str = "id", status_column: str = "is_active") -> str:
"""Xóa mềm (Soft Delete) record PostgreSQL.""" """Xóa Mềm (Soft Delete) an toàn bằng cách cập nhật cờ trạng thái (mặc định is_active = False)."""
if pg_conn is None: return "Lỗi: PostgreSQL chưa kết nối." if pg_conn is None: return "Lỗi: PostgreSQL chưa kết nối."
if not validate_table_name(table_name): return "Lỗi: Tên bảng không hợp lệ." if not validate_table_name(table_name): return "Lỗi: Tên bảng không hợp lệ."
try: try:
sql = f"UPDATE {table_name} SET is_deleted = True, updated_at = %s WHERE id = %s AND project_id = %s" # Cập nhật is_active = False thay vì xóa cứng
sql = f"UPDATE {table_name} SET {status_column} = False WHERE {id_column} = %s"
with pg_conn.cursor() as cursor: with pg_conn.cursor() as cursor:
cursor.execute(sql, (datetime.now(), record_id, project_id)) cursor.execute(sql, (record_id,))
if cursor.rowcount == 0: if cursor.rowcount == 0:
return "Không tìm thấy record hợp lệ để xóa." return "Không tìm thấy record hợp lệ để xóa."
return "Đã xóa (mềm) thành công record." return f"Đã xóa mềm thành công (set {status_column} = False)."
except Exception as e:
pg_conn.rollback()
return f"Lỗi delete Postgres: Có thể bảng này không có cột {status_column}. Chi tiết: {str(e)}"
@mcp.tool()
def get_postgres_foreign_keys(table_name: str) -> str:
"""Lấy danh sách các Khóa ngoại (Ràng buộc liên kết bảng) của một bảng."""
if pg_conn is None: return "Lỗi: PostgreSQL chưa kết nối."
if not validate_table_name(table_name): return "Lỗi: Tên bảng không hợp lệ."
try:
sql = """
SELECT
kcu.column_name,
ccu.table_name AS foreign_table_name,
ccu.column_name AS foreign_column_name
FROM information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu
ON tc.constraint_name = kcu.constraint_name AND tc.table_schema = kcu.table_schema
JOIN information_schema.constraint_column_usage AS ccu
ON ccu.constraint_name = tc.constraint_name AND ccu.table_schema = tc.table_schema
WHERE tc.constraint_type = 'FOREIGN KEY' AND tc.table_name = %s;
"""
with pg_conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(sql, (table_name,))
results = cursor.fetchall()
if not results: return f"Bảng '{table_name}' không có khóa ngoại nào."
return json.dumps(results, indent=2, ensure_ascii=False)
except Exception as e:
pg_conn.rollback()
return f"Lỗi lấy khóa ngoại: {str(e)}"
@mcp.tool()
def get_postgres_triggers(table_name: str) -> str:
"""Lấy danh sách các Trigger (Logic tự động chạy) được gắn trên một bảng."""
if pg_conn is None: return "Lỗi: PostgreSQL chưa kết nối."
if not validate_table_name(table_name): return "Lỗi: Tên bảng không hợp lệ."
try:
sql = """
SELECT event_manipulation AS event, action_statement AS function_call
FROM information_schema.triggers
WHERE event_object_table = %s;
"""
with pg_conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(sql, (table_name,))
results = cursor.fetchall()
if not results: return f"Bảng '{table_name}' không có Trigger nào."
return json.dumps(results, indent=2, ensure_ascii=False)
except Exception as e:
pg_conn.rollback()
return f"Lỗi lấy triggers: {str(e)}"
@mcp.tool()
def get_postgres_function_code(function_name: str) -> str:
"""Lấy mã nguồn (source code) của một function hoặc stored procedure trong PostgreSQL.
Lưu ý: Chỉ truyền tên hàm, KHÔNG truyền dấu ngoặc đơn (). Ví dụ: truyền 'apply_stock_transaction' thay vì 'apply_stock_transaction()'."""
if pg_conn is None: return "Lỗi: PostgreSQL chưa kết nối."
# Xử lý chuỗi an toàn: Bỏ dấu () nếu AI lỡ truyền vào
clean_function_name = function_name.replace("()", "").strip()
try:
# Sử dụng hàm hệ thống pg_get_functiondef để lấy toàn bộ code
sql = """
SELECT pg_get_functiondef(p.oid) AS function_code
FROM pg_proc p
JOIN pg_namespace n ON p.pronamespace = n.oid
WHERE p.proname = %s AND n.nspname = 'public';
"""
with pg_conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(sql, (clean_function_name,))
results = cursor.fetchall()
if not results:
return f"Không tìm thấy code của function '{clean_function_name}'. Có thể hàm không tồn tại hoặc sai tên."
return results[0]['function_code']
except Exception as e: except Exception as e:
pg_conn.rollback() pg_conn.rollback()
return f"Lỗi delete Postgres: {str(e)}" return f"Lỗi lấy code function: {str(e)}"
if __name__ == "__main__": if __name__ == "__main__":
mcp.run() mcp.run()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment