تصميم وحدة قياس النوايا الكمية السببية في المنصة LSys = C × I C = w₁·S + w₂·C + w₃·E + w₄·T class IntentAnalyzer: def calculate_lsys(self, text): compressed_bits = self.compress_text(text) coherence_score = self.calculate_coherence(text) return coherence_score * compressed_bits
def calculate_coherence(self, text):
specificity = self.analyze_specificity(text)
consistency = self.semantic_consistency(text)
emotion = self.emotional_intensity(text)
structure = self.structural_clarity(text)
return (0.3*specificity + 0.3*consistency +
0.2*emotion + 0.2*structure)
النية (LSys) → الإجراء → النتيجة
# بنية متكاملة لتحليل النوايا
class CausalIntentPlatform: def init(self): self.nlp_engine = NLPEngine() self.causal_analyzer = CausalAnalyzer() self.metric_calculator = MetricCalculator()
def analyze_intent_causality(self, text_sequence):
lsys_values = [self.calculate_lsys(text) for text in text_sequence]
causal_effects = self.causal_analyzer.analyze_sequence(lsys_values)
return {
'lsys_trend': lsys_values,
'causal_impact': causal_effects,
'recommendations': self.generate_recommendations(lsys_values)
}
إدخال النص → تحليل LSys → نمذجة سببية → توصيات قابلة للتنفيذ
# واجهة لتحليل النوايا
POST /api/analyze-intent { "text": "النص المدخل", "context": "سياق الاستخدام", "timestamp": "2024-01-01T00:00:00Z" }
{ "lsys_score": 45.7, "coherence_breakdown": { "specificity": 0.8, "consistency": 0.7, "emotion": 0.6, "structure": 0.9 }, "causal_predictions": [...] } ∂τ ∂ψ_x = -i Γ ∇_x ψ_x + Λ ψ_x (1 - (∥ψ_x∥² / |Ĝ ψ_x|²)) -i Γ ∇_x ψ_x Λ ψ_x (1 - (∥ψ_x∥² / |Ĝ ψ_x|²)) ∂τ ψ_x = -i Γ ∇_x ψ_x + Λ ψ_x (1 - 1) = -i Γ ∇_x ψ_x -i Γ ∇_x ψ_x + Λ ψ_x (1 - (∥ψ_x∥² / |Ĝ ψ_x|²)) = 0 C(τ) = ∫ ∥ψ_x(τ)∥² / |Ĝ ψ_x(τ)|² dx import numpy as np from scipy.fft import fft, ifft
def solve_causal_field(psi0, Gamma, Lambda, G_operator, dt, steps): """ حل المعادلة باستخدام طريقة الفروق المحدودة """ psi = psi0.copy() history = [psi0]
for step in range(steps):
# الجزء الخطي: -i Gamma * gradient
linear_term = -1j * Gamma * np.gradient(psi)
# الجزء غير الخطي
psi_norm = np.abs(psi)**2
G_psi = G_operator(psi)
G_norm = np.abs(G_psi)**2
# تجنب القسمة على صفر
with np.errstate(divide='ignore', invalid='ignore'):
ratio = np.divide(psi_norm, G_norm,
out=np.zeros_like(psi_norm),
where=G_norm!=0)
nonlinear_term = Lambda * psi * (1 - ratio)
# تحديث الحقل
psi = psi + dt * (linear_term + nonlinear_term)
history.append(psi.copy())
return np.array(history)
def low_pass_operator(psi, cutoff=0.1): psi_fft = fft(psi) n = len(psi) k = np.arange(n) filter_mask = np.exp(-(k/(cutoff*n))**2) return ifft(psi_fft * filter_mask) import numpy as np import networkx as nx import matplotlib.pyplot as plt from scipy.fft import fft, ifft from scipy.integrate import solve_ivp import plotly.graph_objects as go from plotly.subplots import make_subplots
class SocialIntentPropagation: def init(self, network_size=100, gamma=0.1, lambda_val=0.5, delta_t=0.01): self.N = network_size self.Gamma = gamma # معامل الانتشار self.Lambda = lambda_val # معامل النمو غير الخطي self.dt = delta_t
# إنشاء شبكة اجتماعية (مزيج من Small-world وScale-free)
self.G = self.create_social_network()
# تهيئة حقل النوايا (قيم مركبة)
self.psi = np.random.normal(0, 0.1, self.N) + 1j * np.random.normal(0, 0.1, self.N)
# مصفوفة التقارب (Laplacian matrix)
self.L = nx.laplacian_matrix(self.G).toarray()
def create_social_network(self):
"""إنشاء شبكة اجتماعية واقعية"""
# 70% Small-world (لتمثيل العلاقات القريبة)
G_sw = nx.watts_strogatz_graph(self.N, k=8, p=0.3)
# 30% Scale-free (لتمثيل المؤثرين)
G_sf = nx.barabasi_albert_graph(self.N, m=2)
# دمج الشبكتين
G = nx.compose(G_sw, G_sf)
return G
def global_coherence_operator(self, psi):
"""مؤثر التماسك العالمي Ĝ - متوسط مرجح للجوار"""
psi_global = np.zeros_like(psi, dtype=complex)
for i in range(self.N):
neighbors = list(self.G.neighbors(i))
if neighbors:
# متوسط مرجح بناءً على قوة الروابط
weights = [1.0] * len(neighbors) # يمكن جعلها أكثر تعقيداً
total_weight = sum(weights)
psi_global[i] = sum(psi[j] * w for j, w in zip(neighbors, weights)) / total_weight
else:
psi_global[i] = psi[i]
return psi_global
def intent_dynamics(self, t, psi_flat):
"""ديناميكيات انتشار النوايا - للمحاكاة العددية"""
psi = psi_flat[:self.N] + 1j * psi_flat[self.N:]
# الانتشار الخطي عبر الشبكة
diffusion = -1j * self.Gamma * (self.L @ psi)
# النمو غير الخطي
G_psi = self.global_coherence_operator(psi)
psi_norm = np.abs(psi)**2
G_norm = np.abs(G_psi)**2
# تجنب القسمة على صفر
with np.errstate(divide='ignore', invalid='ignore'):
ratio = np.divide(psi_norm, G_norm,
out=np.ones_like(psi_norm),
where=G_norm > 1e-10)
nonlinear_growth = self.Lambda * psi * (1 - ratio)
# المشتق الكلي
dpsi_dt = diffusion + nonlinear_growth
# تحويل إلى صيغة حقيقية للمحاكاة
return np.concatenate([dpsi_dt.real, dpsi_dt.imag])
def simulate(self, T=10, initial_intent_nodes=None):
"""تشغيل المحاكاة"""
if initial_intent_nodes:
# تهيئة نوايا أولية في عقد محددة
for node, strength in initial_intent_nodes.items():
self.psi[node] = strength * (1 + 1j)
# الوقت
t_span = (0, T)
t_eval = np.linspace(0, T, int(T/self.dt))
# الشروط الأولية
psi0 = np.concatenate([self.psi.real, self.psi.imag])
# حل المعادلات التفاضلية
solution = solve_ivp(self.intent_dynamics, t_span, psi0,
t_eval=t_eval, method='RK45')
# استخراج النتائج
self.time = solution.t
self.psi_history = []
for i in range(len(solution.t)):
psi_real = solution.y[:self.N, i]
psi_imag = solution.y[self.N:, i]
self.psi_history.append(psi_real + 1j * psi_imag)
return self.psi_history
def calculate_metrics(self):
"""حساب مقاييس التماسك والانتشار"""
coherence_local = []
coherence_global = []
intent_strength = []
for psi in self.psi_history:
# قوة النية المحلية
local_strength = np.abs(psi)**2
intent_strength.append(np.mean(local_strength))
# التماسك المحلي
coherence_local.append(np.std(local_strength))
# التماسك العالمي
G_psi = self.global_coherence_operator(psi)
global_strength = np.abs(G_psi)**2
coherence_global.append(np.mean(global_strength))
return {
'intent_strength': intent_strength,
'local_coherence': coherence_local,
'global_coherence': coherence_global
}
class IntentVisualization:
def __init__(self, simulator):
self.simulator = simulator
def plot_network_evolution(self, time_points=None):
"""تصور تطور النوايا في الشبكة عبر الزمن"""
if time_points is None:
time_points = [0, len(self.simulator.psi_history)//3,
2*len(self.simulator.psi_history)//3,
-1]
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
axes = axes.flatten()
pos = nx.spring_layout(self.simulator.G)
for i, t_idx in enumerate(time_points):
if t_idx >= len(self.simulator.psi_history):
continue
psi = self.simulator.psi_history[t_idx]
node_colors = np.abs(psi) # شدة النية
node_sizes = 500 * (np.angle(psi) + np.pi) / (2 * np.pi) # طور النية
nx.draw(self.simulator.G, pos, ax=axes[i],
node_color=node_colors,
node_size=node_sizes,
cmap='viridis',
edge_color='gray',
alpha=0.7)
axes[i].set_title(f'الزمن: {self.simulator.time[t_idx]:.2f}')
plt.suptitle('تطور انتشار النوايا في الشبكة الاجتماعية')
plt.tight_layout()
plt.show()
def plot_metrics_time_series(self, metrics):
"""رسم مقاييس التماسك عبر الزمن"""
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(12, 10))
# قوة النية
ax1.plot(self.simulator.time, metrics['intent_strength'])
ax1.set_ylabel('قوة النية المتوسطة')
ax1.grid(True, alpha=0.3)
# التماسك المحلي
ax2.plot(self.simulator.time, metrics['local_coherence'])
ax2.set_ylabel('التماسك المحلي (انحراف معياري)')
ax2.grid(True, alpha=0.3)
# التماسك العالمي
ax3.plot(self.simulator.time, metrics['global_coherence'])
ax3.set_ylabel('التماسك العالمي')
ax3.set_xlabel('الزمن')
ax3.grid(True, alpha=0.3)
plt.suptitle('مقاييس انتشار النوايا عبر الزمن')
plt.tight_layout()
plt.show()
def create_interactive_plot(self):
"""إنشاء تصور تفاعلي باستخدام Plotly"""
pos = nx.spring_layout(self.simulator.G)
fig = make_subplots(rows=1, cols=1, subplot_titles=['انتشار النوايا في الشبكة الاجتماعية'])
# إضافة الروابط
edge_x = []
edge_y = []
for edge in self.simulator.G.edges():
x0, y0 = pos[edge[0]]
x1, y1 = pos[edge[1]]
edge_x.extend([x0, x1, None])
edge_y.extend([y0, y1, None])
fig.add_trace(go.Scatter(x=edge_x, y=edge_y,
line=dict(width=0.5, color='#888'),
hoverinfo='none',
mode='lines'),
row=1, col=1)
# إضافة العقد (لآخر خطوة زمنية)
psi_final = self.simulator.psi_history[-1]
node_x = [pos[node][0] for node in self.simulator.G.nodes()]
node_y = [pos[node][1] for node in self.simulator.G.nodes()]
fig.add_trace(go.Scatter(x=node_x, y=node_y,
mode='markers',
hoverinfo='text',
marker=dict(
size=20,
color=np.abs(psi_final),
colorscale='Viridis',
showscale=True,
colorbar=dict(title="شدة النية")
)),
row=1, col=1)
fig.update_layout(title_text="محاكاة انتشار النوايا في الشبكة الاجتماعية",
showlegend=False)
fig.show()
# إنشاء وتشغيل المحاكاة
simulator = SocialIntentPropagation(network_size=50, gamma=0.2, lambda_val=0.8)
initial_intents = { 0: 2.0, # مؤثر رئيسي 5: 1.5, # مؤثر ثانوي 10: 1.2 # مؤثر ثانوي }
psi_history = simulator.simulate(T=20, initial_intent_nodes=initial_intents)
metrics = simulator.calculate_metrics()
viz = IntentVisualization(simulator) viz.plot_network_evolution() viz.plot_metrics_time_series(metrics) viz.create_interactive_plot() def analyze_propagation_performance(metrics, time_window=5): """تحليل أداء انتشار النوايا""" strength = metrics['intent_strength'] local_coh = metrics['local_coherence'] global_coh = metrics['global_coherence']
# سرعة الانتشار
propagation_speed = (strength[-1] - strength[0]) / len(strength)
# كفاءة الانتشار (نسبة التماسك العالمي إلى المحلي)
efficiency = np.mean([g/l if l > 0 else 0 for g, l in zip(global_coh, local_coh)])
# استقرار النظام
stability = 1.0 / (1.0 + np.std(strength[-time_window:]))
return {
'propagation_speed': propagation_speed,
'efficiency': efficiency,
'stability': stability,
'final_coverage': strength[-1] / max(strength) if max(strength) > 0 else 0
}
def optimize_campaign(network, target_coverage=0.8):
"""تحسين اختيار المؤثرين لحملة توعية"""
centrality = nx.eigenvector_centrality(network)
best_influencers = sorted(centrality.items(), key=lambda x: x[1], reverse=True)[:5]
return {node: 2.0 for node, _ in best_influencers}
def detect_rumor_pattern(psi_history, threshold=0.7):
"""كشف أنماط انتشار الشائعات"""
patterns = []
for t, psi in enumerate(psi_history):
coherence_ratio = np.std(np.abs(psi)) / np.mean(np.abs(psi))
if coherence_ratio > threshold:
patterns.append({
'time': t,
'anomaly_score': coherence_ratio,
'affected_nodes': np.sum(np.abs(psi) > np.mean(np.abs(psi)))
})
return patterns
import pandas as pd
import tweepy from sklearn.preprocessing import StandardScaler import torch import torch.nn as nn
class MultiIntentSocialModel: def init(self, network_size=100, intent_types=3): self.N = network_size self.K = intent_types # أنواع النوايا self.intent_names = ['إيجابي', 'سلبي', 'محايد'] # أسماء الأنواع
# مصفوفات التفاعل بين أنواع النوايا
self.interaction_matrix = self.create_interaction_matrix()
# تهيئة حقول النوايا لكل نوع
self.psi = np.random.normal(0, 0.1, (self.K, self.N)) + \
1j * np.random.normal(0, 0.1, (self.K, self.N))
def create_interaction_matrix(self):
"""مصفوفة التفاعلات بين أنواع النوايا"""
# مصفوفة K x K تمثل التفاعلات
# القطر الرئيسي: تفاعل ذاتي، خارج القطر: تفاعل مع الأنواع الأخرى
interaction = np.array([
[0.8, -0.3, 0.1], # إيجابي يتنافر مع سلبي، يتفاعل إيجابياً مع محايد
[-0.3, 0.7, -0.2], # سلبي يتنافر مع إيجابي ومحايد
[0.1, -0.2, 0.6] # محايد يتفاعل weakly مع الآخرين
])
return interaction[:self.K, :self.K]
def coupled_intent_dynamics(self, t, psi_flat):
"""ديناميكيات مترابطة لأنواع متعددة من النوايا"""
# إعادة تشكيل المتجه المسطح
psi_flat = psi_flat.reshape((self.K, 2 * self.N))
psi = psi_flat[:, :self.N] + 1j * psi_flat[:, self.N:]
dpsi_dt = np.zeros_like(psi, dtype=complex)
for k in range(self.K):
# الانتشار داخل النوع الواحد
diffusion = -1j * self.Gamma * (self.L @ psi[k])
# التفاعلات بين الأنواع المختلفة
interaction_term = np.zeros(self.N, dtype=complex)
for j in range(self.K):
if j != k:
# تفاعل غير خطي بين النوع k والنوع j
interaction_strength = self.interaction_matrix[k, j]
interaction_term += interaction_strength * psi[j] * np.conj(psi[k])
# النمو الذاتي غير الخطي
G_psi_k = self.global_coherence_operator(psi[k])
psi_norm = np.abs(psi[k])**2
G_norm = np.abs(G_psi_k)**2
with np.errstate(divide='ignore', invalid='ignore'):
ratio = np.divide(psi_norm, G_norm,
out=np.ones_like(psi_norm),
where=G_norm > 1e-10)
nonlinear_growth = self.Lambda * psi[k] * (1 - ratio)
# المعادلة الكاملة للنوع k
dpsi_dt[k] = diffusion + nonlinear_growth + interaction_term
# تحويل إلى صيغة حقيقية
return np.concatenate([dpsi_dt.real, dpsi_dt.imag], axis=1).flatten()
class RealSocialNetworkIntegration:
def __init__(self, api_keys=None):
self.api_keys = api_keys
self.scaler = StandardScaler()
def load_twitter_network(self, username, max_followers=1000):
"""تحميل شبكة توتر حقيقية"""
if self.api_keys:
auth = tweepy.OAuthHandler(self.api_keys['consumer_key'],
self.api_keys['consumer_secret'])
auth.set_access_token(self.api_keys['access_token'],
self.api_keys['access_token_secret'])
api = tweepy.API(auth, wait_on_rate_limit=True)
# الحصول على المتابعين
followers = []
try:
for follower in tweepy.Cursor(api.get_followers, screen_name=username).items(max_followers):
followers.append(follower.screen_name)
except tweepy.TweepError as e:
print(f"Error: {e}")
return self.create_network_from_followers(followers)
else:
# استخدام بيانات تجريبية إذا لم توجد مفاتيح API
return self.create_sample_network()
def load_facebook_data(self, dataset_path):
"""تحميل بيانات فيسبوك من مجموعة بيانات Stanford"""
try:
# تحميل من Stanford Large Network Dataset
edges = pd.read_csv(f'{dataset_path}/edges.csv')
nodes = pd.read_csv(f'{dataset_path}/nodes.csv')
G = nx.Graph()
G.add_edges_from(edges.values)
return G
except:
return nx.karate_club_graph() # شبكة تجريبية
def extract_intent_from_text(self, text, intent_classifier):
"""استخراج النوايا من النصوص باستخدام نموذج تصنيف"""
# استخدام نموذج مسبق التدريب لتحليل المشاعر والنوايا
probabilities = intent_classifier.predict_proba([text])[0]
# تحويل إلى متجه نوايا مركب (لتمثيل القوة والطور)
intent_vector = []
for prob in probabilities:
magnitude = prob # القوة
phase = 2 * np.pi * prob # الطور
intent_vector.append(magnitude * np.exp(1j * phase))
return np.array(intent_vector)
def create_hybrid_network(self, online_network, offline_network):
"""دمج الشبكات الافتراضية والواقعية"""
return nx.compose(online_network, offline_network)
class IntentClassifier: """نموذج تصنيف النوايا باستخدام التعلم العميق""" def init(self, vocab_size=10000, embedding_dim=128, num_intents=3): self.model = nn.Sequential( nn.Embedding(vocab_size, embedding_dim), nn.LSTM(embedding_dim, 64, batch_first=True), nn.Linear(64, 32), nn.ReLU(), nn.Linear(32, num_intents), nn.Softmax(dim=1) )
def predict_proba(self, texts):
"""تنبؤ باحتمالات أنواع النوايا"""
# تنفيذ مبسط - في التطبيق الحقيقي تحتاج preprocessing
return np.random.dirichlet(np.ones(3), size=len(texts))
class RealDataSocialSimulator:
def __init__(self, network_data, text_data=None):
self.network = network_data
self.text_data = text_data
self.N = len(self.network.nodes())
# نموذج النوايا المتعددة
self.intent_model = MultiIntentSocialModel(network_size=self.N)
# مصنف النوايا
self.intent_classifier = IntentClassifier()
def initialize_intents_from_data(self):
"""تهيئة النوايا من البيانات الحقيقية"""
if self.text_data is not None:
for node_id, node_data in self.text_data.items():
if 'text' in node_data:
text = node_data['text']
intent_vector = self.extract_intent_from_text(text, self.intent_classifier)
# تعيين النوايا المستخرجة للعقدة
for k in range(len(intent_vector)):
self.intent_model.psi[k, node_id] = intent_vector[k]
def simulate_real_world_scenario(self, T=30, events=None):
"""محاكاة سيناريو واقعي مع أحداث خارجية"""
# تهيئة من البيانات الحقيقية
self.initialize_intents_from_data()
# محاكاة الأحداث الخارجية (مثل أخبار، حملات)
if events:
self.apply_external_events(events)
# تشغيل المحاكاة
results = self.intent_model.simulate(T=T)
return results
def apply_external_events(self, events):
"""تطبيق أحداث خارجية على الشبكة"""
for event in events:
if event['type'] == 'news':
# تأثير الأخبار على نوايا محددة
affected_nodes = self.get_susceptible_nodes(event['topic'])
for node in affected_nodes:
self.intent_model.psi[event['intent_type'], node] *= event['impact']
elif event['type'] == 'campaign':
# حملة توعية أو إعلانية
target_nodes = event.get('target_nodes', range(self.N))
for node in target_nodes:
self.intent_model.psi[event['intent_type'], node] += event['strength']
def get_susceptible_nodes(self, topic):
"""العقد الأكثر تأثراً بموضوع معين"""
# محاكاة مبسطة - في التطبيق الحقيقي تستخدم تحليل شبكة
centrality = nx.eigenvector_centrality(self.network)
return sorted(centrality, key=centrality.get, reverse=True)[:10]
class AdvancedIntentAnalytics:
def __init__(self, simulator):
self.simulator = simulator
self.results = None
def analyze_cross_intent_correlation(self, psi_history):
"""تحليل الارتباط بين أنواع النوايا المختلفة"""
correlations = np.zeros((self.simulator.intent_model.K,
self.simulator.intent_model.K,
len(psi_history)))
for t, psi in enumerate(psi_history):
for i in range(self.simulator.intent_model.K):
for j in range(self.simulator.intent_model.K):
corr = np.corrcoef(np.abs(psi[i]), np.abs(psi[j]))[0,1]
correlations[i, j, t] = corr if not np.isnan(corr) else 0
return correlations
def detect_phase_synchronization(self, psi_history, threshold=0.8):
"""كشف التزامن الطوري بين أنواع النوايا"""
sync_events = []
for t, psi in enumerate(psi_history):
phases = np.angle(psi) # الأطوار لكل نوع نية
# حساب التزامن الطوري
sync_matrix = np.zeros((self.simulator.intent_model.K,
self.simulator.intent_model.K))
for i in range(self.simulator.intent_model.K):
for j in range(i+1, self.simulator.intent_model.K):
phase_diff = np.mod(phases[i] - phases[j], 2*np.pi)
sync_strength = np.abs(np.mean(np.exp(1j * phase_diff)))
sync_matrix[i, j] = sync_strength
# تسجيل أحداث التزامن
strong_sync = np.where(sync_matrix > threshold)
if len(strong_sync[0]) > 0:
sync_events.append({
'time': t,
'sync_pairs': list(zip(strong_sync[0], strong_sync[1])),
'strength': sync_matrix[strong_sync]
})
return sync_events
def intent_competition_analysis(self, psi_history):
"""تحليل التنافس بين أنواع النوايا"""
dominance = np.zeros((self.simulator.intent_model.K, len(psi_history)))
for t, psi in enumerate(psi_history):
strengths = np.mean(np.abs(psi)**2, axis=1) # متوسط القوة لكل نوع
total_strength = np.sum(strengths)
if total_strength > 0:
dominance[:, t] = strengths / total_strength
return dominance
# مثال تطبيقي مع بيانات توتر تجريبية
def demo_real_world_simulation(): # دمج بيانات حقيقية network_integrator = RealSocialNetworkIntegration()
# تحميل شبكة توتر (بيانات تجريبية إذا لم توجد مفاتيح API)
twitter_network = network_integrator.load_twitter_network("example_user")
# بيانات نصية محاكاة (في التطبيق الحقيقي تكون من التغريدات)
sample_text_data = {
0: {"text": "أحب هذا المنتج الجديد! إنه رائع 👏", "user": "user1"},
1: {"text": "مستاء من الخدمة، كانت تجربة سيئة 😠", "user": "user2"},
2: {"text": "أقرأ عن الموضوع، لا أملك رأيًا واضحًا بعد", "user": "user3"}
}
# إنشاء المحاكي
simulator = RealDataSocialSimulator(twitter_network, sample_text_data)
# تعريف أحداث خارجية
events = [
{
'type': 'news',
'topic': 'منتج جديد',
'intent_type': 0, # نية إيجابية
'impact': 1.5,
'time': 5
},
{
'type': 'campaign',
'intent_type': 1, # نية سلبية
'strength': -0.3, # تقليل النية السلبية
'target_nodes': [0, 1, 2],
'time': 10
}
]
# تشغيل المحاكاة
results = simulator.simulate_real_world_scenario(T=20, events=events)
# التحليلات المتقدمة
analytics = AdvancedIntentAnalytics(simulator)
correlations = analytics.analyze_cross_intent_correlation(results)
sync_events = analytics.detect_phase_synchronization(results)
dominance = analytics.intent_competition_analysis(results)
return {
'simulator': simulator,
'results': results,
'analytics': {
'correlations': correlations,
'sync_events': sync_events,
'dominance': dominance
}
}
simulation_results = demo_real_world_simulation() class MultiIntentVisualization: def init(self, simulation_results): self.results = simulation_results
def plot_intent_evolution_3d(self):
"""تصور ثلاثي الأبعاد لتطور النوايا"""
fig = plt.figure(figsize=(15, 10))
# تطور القوة لكل نوع نية
for k in range(self.results['simulator'].intent_model.K):
strengths = [np.mean(np.abs(psi[k])**2) for psi in self.results['results']]
plt.plot(self.results['simulator'].intent_model.time, strengths,
label=self.results['simulator'].intent_model.intent_names[k],
linewidth=2)
plt.xlabel('الزمن')
plt.ylabel('قوة النية المتوسطة')
plt.title('تطور قوى أنواع النوايا المختلفة')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
def plot_interaction_network(self, time_step=-1):
"""تصور شبكة التفاعلات بين النوايا"""
psi = self.results['results'][time_step]
fig, axes = plt.subplots(1, self.results['simulator'].intent_model.K,
figsize=(20, 6))
for k in range(self.results['simulator'].intent_model.K):
node_colors = np.abs(psi[k])
node_sizes = 300 * (np.angle(psi[k]) + np.pi) / (2 * np.pi)
pos = nx.spring_layout(self.results['simulator'].network)
nx.draw(self.results['simulator'].network, pos, ax=axes[k],
node_color=node_colors, node_size=node_sizes,
cmap='viridis', edge_color='gray', alpha=0.7)
axes[k].set_title(f'{self.results["simulator"].intent_model.intent_names[k]} - الزمن: {time_step}')
plt.tight_layout()
plt.show()
def create_interactive_dashboard(self):
"""لوحة تحكم تفاعلية متكاملة"""
import plotly.express as px
from plotly.subplots import make_subplots
# تجهيز البيانات
time_points = self.results['simulator'].intent_model.time
intent_names = self.results['simulator'].intent_model.intent_names
# إنشاء لوحة متعددة الرسوم
fig = make_subplots(rows=2, cols=2,
subplot_titles=['تطور النوايا', 'السيطرة النسبية',
'الارتباطات', 'أحداث التزامن'])
# إضافة الرسوم البيانية
self._add_evolution_plot(fig, time_points, intent_names, row=1, col=1)
self._add_dominance_plot(fig, time_points, intent_names, row=1, col=2)
self._add_correlation_plot(fig, time_points, intent_names, row=2, col=1)
self._add_sync_events_plot(fig, time_points, row=2, col=2)
fig.update_layout(height=800, title_text="لوحة تحليل تفاعلات النوايا")
fig.show()
def integrate_with_causal_platform(multi_intent_model, causal_platform):
"""تكامل نموذج النوايا المتعددة مع المنصة السببية الرئيسية"""
# إضافة وحدة تحليل التفاعلات
causal_platform.add_module('multi_intent_analysis', {
'model': multi_intent_model,
'analytics': AdvancedIntentAnalytics(multi_intent_model),
'visualization': MultiIntentVisualization(multi_intent_model)
})
# ربط مع مقاييس LSys
def calculate_composite_lsys(psi):
"""حساب LSys مركب لأنواع متعددة"""
composite_intent = np.sum(psi, axis=0) # جمع المتجهات
return causal_platform.calculate_lsys(composite_intent)
return calculate_composite_lsys
import numpy as np
from scipy.integrate import solve_ivp import matplotlib.pyplot as plt from mpl_toolkits.mplot3d import Axes3D
class CompetitiveCooperativeIntentModel: def init(self, num_intent_types=4, network_size=100): self.K = num_intent_types self.N = network_size
# أنواع النوايا: [إيجابي، سلبي، محايد، استفساري]
self.intent_types = ['إيجابي', 'سلبي', 'محايد', 'استفساري']
# مصفوفة التفاعل الديناميكية
self.interaction_matrix = self.create_dynamic_interaction_matrix()
# معاملات النمو الذاتي
self.growth_rates = np.array([0.8, 0.6, 0.4, 0.5])
# سعات الحمل (القدرة الاستيعابية للشبكة)
self.carrying_capacities = np.array([1.0, 0.8, 1.2, 0.9])
# تهيئة حالات النوايا
self.initialize_intent_states()
def create_dynamic_interaction_matrix(self):
"""مصفوفة تفاعل ديناميكية تعكس العلاقات المعقدة"""
# القيم الإيجابية: تعاون، القيم السلبية: تنافس
interaction = np.array([
[0.8, -0.6, 0.2, 0.3], # إيجابي يتنافس مع سلبي، يتعاون مع الآخرين
[-0.6, 0.7, -0.4, -0.2], # سلبي يتنافس مع الجميع
[0.2, -0.4, 0.6, 0.1], # محايد يتعاون مع إيجابي
[0.3, -0.2, 0.1, 0.5] # استفساري يتعاون مع إيجابي
])
return interaction[:self.K, :self.K]
def initialize_intent_states(self):
"""تهيئة التوزيع الأولي للنوايا"""
self.X = np.random.uniform(0.1, 0.5, (self.K, self.N))
def competitive_cooperative_dynamics(self, t, X_flat):
"""ديناميكيات التنافس والتعاون المستوحاة من معادلات لوتكا-فولتيرا"""
X = X_flat.reshape((self.K, self.N))
dX_dt = np.zeros_like(X)
for i in range(self.K):
# النمو الذاتي اللوجستي
growth_term = self.growth_rates[i] * X[i] * \
(1 - X[i] / self.carrying_capacities[i])
# تفاعلات مع الأنواع الأخرى
interaction_term = np.zeros(self.N)
for j in range(self.K):
if i != j:
interaction_strength = self.interaction_matrix[i, j]
interaction_term += interaction_strength * X[i] * X[j]
# التأثير الشبكي (انتشار عبر الشبكة)
if hasattr(self, 'laplacian'):
diffusion_term = 0.1 * (self.laplacian @ X[i])
else:
diffusion_term = 0
# المعادلة الكاملة
dX_dt[i] = growth_term + interaction_term + diffusion_term
return dX_dt.flatten()
def set_network_topology(self, graph):
"""تعيين طوبولوجيا الشبكة الاجتماعية"""
self.laplacian = nx.laplacian_matrix(graph).toarray()
self.network = graph
def simulate_interactions(self, T=50, initial_conditions=None):
"""محاكاة التفاعلات عبر الزمن"""
if initial_conditions:
self.X = initial_conditions.copy()
t_span = (0, T)
t_eval = np.linspace(0, T, 1000)
solution = solve_ivp(self.competitive_cooperative_dynamics,
t_span, self.X.flatten(),
t_eval=t_eval, method='RK45')
# استخراج النتائج
self.time = solution.t
self.history = solution.y.reshape((self.K, self.N, -1))
return self.history
class InteractionDynamicsAnalyzer:
def __init__(self, model):
self.model = model
self.analysis_results = {}
def calculate_competition_cooperation_metrics(self, history):
"""حساب مقاييس التنافس والتعاون"""
K, N, T = history.shape
competition_index = np.zeros(T)
cooperation_index = np.zeros(T)
dominance_entropy = np.zeros(T)
for t in range(T):
X_t = history[:, :, t]
# مؤشر التنافس (مجموع التفاعلات السلبية)
negative_interactions = 0
positive_interactions = 0
for i in range(K):
for j in range(i+1, K):
interaction = self.model.interaction_matrix[i, j]
product = np.mean(X_t[i] * X_t[j])
if interaction < 0:
negative_interactions += abs(interaction) * product
else:
positive_interactions += interaction * product
competition_index[t] = negative_interactions
cooperation_index[t] = positive_interactions
# إنتروبيا السيطرة (قياس التنوع في الهيمنة)
dominance = np.mean(X_t, axis=1)
dominance_normalized = dominance / np.sum(dominance)
dominance_entropy[t] = -np.sum(dominance_normalized *
np.log(dominance_normalized + 1e-10))
return {
'competition_index': competition_index,
'cooperation_index': cooperation_index,
'dominance_entropy': dominance_entropy
}
def detect_phase_transitions(self, history, window_size=50):
"""كشف التحولات الطورية في النظام"""
K, N, T = history.shape
# تباين النظام عبر الزمن
system_variance = np.zeros(T)
correlation_length = np.zeros(T)
for t in range(T):
# تباين شامل
system_variance[t] = np.var(history[:, :, t])
# طول الارتباط (مقياس للتنسيق)
if hasattr(self.model, 'laplacian'):
corr_matrix = np.corrcoef(history[:, :, t])
eigenvalues = np.linalg.eigvals(corr_matrix)
correlation_length[t] = 1.0 / (1 - np.max(eigenvalues))
# كشف النقاط الحرجة
critical_points = []
variance_derivative = np.gradient(system_variance)
for t in range(window_size, T - window_size):
if abs(variance_derivative[t]) > 2 * np.std(variance_derivative):
critical_points.append(t)
return {
'system_variance': system_variance,
'correlation_length': correlation_length,
'critical_points': critical_points,
'variance_derivative': variance_derivative
}
def analyze_attractor_dynamics(self, history):
"""تحليل ديناميكيات الجاذبات في فضاء الطور"""
from sklearn.decomposition import PCA
# اختزال الأبعاد لتصور فضاء الطور
pca = PCA(n_components=3)
phase_space_data = []
for t in range(history.shape[2]):
flattened = history[:, :, t].flatten()
phase_space_data.append(flattened)
phase_space_data = np.array(phase_space_data)
phase_space_reduced = pca.fit_transform(phase_space_data)
# تحليل الاستقرار
stability_analysis = self.calculate_lyapunov_exponents(history)
return {
'phase_space_reduced': phase_space_reduced,
'pca_components': pca.components_,
'explained_variance': pca.explained_variance_ratio_,
'lyapunov_exponents': stability_analysis
}
def calculate_lyapunov_exponents(self, history, epsilon=1e-6):
"""حساب أسس لياپونوف لتحليل الاستقرار"""
# تنفيذ مبسط - في التطبيق الحقيقي يحتاج خوارزمية متقدمة
K, N, T = history.shape
exponents = []
for k in range(K):
# اختلاف بسيط في الشروط الأولية
diff_evolution = []
for t in range(1, min(100, T)):
diff = np.mean(abs(history[k, :, t] - history[k, :, t-1]))
diff_evolution.append(diff)
if len(diff_evolution) > 1:
exponent = np.polyfit(range(len(diff_evolution)),
np.log(np.array(diff_evolution) + epsilon), 1)[0]
exponents.append(exponent)
return exponents
class ComplexInteractionScenarios:
def __init__(self, model):
self.model = model
self.analyzer = InteractionDynamicsAnalyzer(model)
def scenario_competitive_dominance(self, T=100):
"""سيناريو السيطرة التنافسية"""
# شروط أولية: نوع واحد مسيطر
initial_conditions = np.zeros((self.model.K, self.model.N))
initial_conditions[0] = 0.8 # إيجابي مسيطر
initial_conditions[1] = 0.1 # سلبي ضعيف
history = self.model.simulate_interactions(T=T,
initial_conditions=initial_conditions)
analysis = self.analyzer.calculate_competition_cooperation_metrics(history)
phase_transitions = self.analyzer.detect_phase_transitions(history)
return {
'history': history,
'analysis': analysis,
'phase_transitions': phase_transitions,
'scenario': 'السيطرة التنافسية'
}
def scenario_cooperative_coexistence(self, T=100):
"""سيناريو التعايش التعاوني"""
# تعديل مصفوفة التفاعل لتعزيز التعاون
original_matrix = self.model.interaction_matrix.copy()
self.model.interaction_matrix = np.array([
[0.7, 0.1, 0.2, 0.3],
[0.1, 0.6, 0.1, 0.2],
[0.2, 0.1, 0.5, 0.1],
[0.3, 0.2, 0.1, 0.4]
])
# توزيع متوازن أولي
initial_conditions = np.random.uniform(0.3, 0.5, (self.model.K, self.model.N))
history = self.model.simulate_interactions(T=T,
initial_conditions=initial_conditions)
analysis = self.analyzer.calculate_competition_cooperation_metrics(history)
# استعادة المصفوفة الأصلية
self.model.interaction_matrix = original_matrix
return {
'history': history,
'analysis': analysis,
'scenario': 'التعايش التعاوني'
}
def scenario_oscillatory_behavior(self, T=200):
"""سيناريو السلوك التذبذبي"""
# مصفوفة تفاعل تسبب التذبذب
self.model.interaction_matrix = np.array([
[0.5, -0.8, 0.1, 0.2],
[-0.8, 0.5, -0.3, -0.1],
[0.1, -0.3, 0.4, 0.1],
[0.2, -0.1, 0.1, 0.3]
])
# شروط أولية متقاربة
initial_conditions = np.random.uniform(0.4, 0.6, (self.model.K, self.model.N))
history = self.model.simulate_interactions(T=T,
initial_conditions=initial_conditions)
attractor_analysis = self.analyzer.analyze_attractor_dynamics(history)
return {
'history': history,
'attractor_analysis': attractor_analysis,
'scenario': 'السلوك التذبذبي'
}
def scenario_external_shock(self, T=150, shock_time=50):
"""سيناريو الصدمة الخارجية"""
# محاكاة تأثير حدث خارجي مفاجئ
history = self.model.simulate_interactions(T=shock_time)
# تطبيق الصدمة
shock_intensity = 0.5
shocked_state = history[:, :, -1].copy()
shocked_state[1] += shock_intensity # تعزيز النوايا السلبية
# متابعة المحاكاة بعد الصدمة
post_shock_history = self.model.simulate_interactions(
T=T-shock_time, initial_conditions=shocked_state)
# دمج التاريخين
full_history = np.concatenate([history, post_shock_history], axis=2)
resilience_metrics = self.analyze_system_resilience(full_history, shock_time)
return {
'history': full_history,
'resilience_metrics': resilience_metrics,
'shock_time': shock_time,
'scenario': 'الصدمة الخارجية'
}
def analyze_system_resilience(self, history, shock_time):
"""تحليل مرونة النظام"""
recovery_threshold = 0.1
pre_shock_stability = np.std(history[:, :, :shock_time])
post_shock_variance = np.std(history[:, :, shock_time:])
# وقت التعافي
recovery_time = None
for t in range(shock_time, history.shape[2]):
current_stability = np.std(history[:, :, t])
if abs(current_stability - pre_shock_stability) < recovery_threshold:
recovery_time = t - shock_time
break
return {
'pre_shock_stability': pre_shock_stability,
'post_shock_variance': post_shock_variance,
'recovery_time': recovery_time,
'resilience_index': 1.0 / (recovery_time + 1) if recovery_time else 0
}
class AdvancedInteractionVisualization:
def __init__(self, scenarios_results):
self.results = scenarios_results
def plot_competition_cooperation_evolution(self, scenario_results):
"""رسم تطور مؤشرات التنافس والتعاون"""
analysis = scenario_results['analysis']
time = np.linspace(0, 100, len(analysis['competition_index']))
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))
# مؤشر التنافس والتعاون
ax1.plot(time, analysis['competition_index'], 'r-', label='التنافس', linewidth=2)
ax1.plot(time, analysis['cooperation_index'], 'g-', label='التعاون', linewidth=2)
ax1.set_ylabel('شدة التفاعل')
ax1.legend()
ax1.grid(True, alpha=0.3)
ax1.set_title(f'تطور التنافس والتعاون - {scenario_results["scenario"]}')
# إنتروبيا السيطرة
ax2.plot(time, analysis['dominance_entropy'], 'b-', label='تنوع السيطرة', linewidth=2)
ax2.set_ylabel('إنتروبيا السيطرة')
ax2.set_xlabel('الزمن')
ax2.legend()
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def plot_phase_space_3d(self, scenario_results):
"""تصور ثلاثي الأبعاد لفضاء الطور"""
if 'attractor_analysis' in scenario_results:
phase_space = scenario_results['attractor_analysis']['phase_space_reduced']
fig = plt.figure(figsize=(10, 8))
ax = fig.add_subplot(111, projection='3d')
# تلوين وفق الزمن
colors = plt.cm.viridis(np.linspace(0, 1, len(phase_space)))
ax.scatter(phase_space[:, 0], phase_space[:, 1], phase_space[:, 2],
c=colors, alpha=0.6, s=20)
# رسم المسار
ax.plot(phase_space[:, 0], phase_space[:, 1], phase_space[:, 2],
'gray', alpha=0.3)
ax.set_xlabel('المكون الرئيسي 1')
ax.set_ylabel('المكون الرئيسي 2')
ax.set_zlabel('المكون الرئيسي 3')
ax.set_title(f'فضاء الطور - {scenario_results["scenario"]}')
plt.show()
def plot_system_resilience(self, scenario_results):
"""تصور مرونة النظام بعد الصدمة"""
if 'resilience_metrics' in scenario_results:
history = scenario_results['history']
shock_time = scenario_results['shock_time']
# حساب الاستقرار عبر الزمن
stability = [np.std(history[:, :, t]) for t in range(history.shape[2])]
plt.figure(figsize=(12, 6))
plt.plot(stability, 'b-', linewidth=2)
plt.axvline(x=shock_time, color='r', linestyle='--',
label=f'وقت الصدمة: {shock_time}')
if scenario_results['resilience_metrics']['recovery_time']:
recovery_point = shock_time + scenario_results['resilience_metrics']['recovery_time']
plt.axvline(x=recovery_point, color='g', linestyle='--',
label=f'وقت التعافي: {scenario_results["resilience_metrics"]["recovery_time"]}')
plt.xlabel('الزمن')
plt.ylabel('تباين النظام')
plt.title(f'مرونة النظام - {scenario_results["scenario"]}')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
def create_comparative_dashboard(self, all_scenarios):
"""لوحة مقارنة بين جميع السيناريوهات"""
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
axes = axes.flatten()
for idx, (scenario_name, results) in enumerate(all_scenarios.items()):
analysis = results['analysis']
time = np.linspace(0, 100, len(analysis['competition_index']))
# نسبة التعاون إلى التنافس
cooperation_ratio = analysis['cooperation_index'] / \
(analysis['competition_index'] + 1e-10)
axes[idx].plot(time, cooperation_ratio, 'purple', linewidth=2)
axes[idx].set_title(f'{scenario_name}\nنسبة التعاون/التنافس')
axes[idx].set_ylabel('النسبة')
axes[idx].grid(True, alpha=0.3)
if idx >= 2:
axes[idx].set_xlabel('الزمن')
plt.tight_layout()
plt.show()
# تشغيل جميع السيناريوهات
def run_comprehensive_analysis(): # إنشاء النموذج model = CompetitiveCooperativeIntentModel(num_intent_types=4, network_size=50)
# إنشاء شبكة اجتماعية
G = nx.watts_strogatz_graph(50, 6, 0.3)
model.set_network_topology(G)
# مدير السيناريوهات
scenario_manager = ComplexInteractionScenarios(model)
# تشغيل السيناريوهات المختلفة
scenarios = {
'السيطرة التنافسية': scenario_manager.scenario_competitive_dominance(),
'التعايش التعاوني': scenario_manager.scenario_cooperative_coexistence(),
'السلوك التذبذبي': scenario_manager.scenario_oscillatory_behavior(),
'الصدمة الخارجية': scenario_manager.scenario_external_shock()
}
# التصور
viz = AdvancedInteractionVisualization(scenarios)
# رسم النتائج
for scenario_name, results in scenarios.items():
print(f"تحليل سيناريو: {scenario_name}")
viz.plot_competition_cooperation_evolution(results)
if 'attractor_analysis' in results:
viz.plot_phase_space_3d(results)
if 'resilience_metrics' in results:
viz.plot_system_resilience(results)
# لوحة المقارنة
viz.create_comparative_dashboard(scenarios)
return scenarios
comprehensive_results = run_comprehensive_analysis() class StrategicRecommendations: def init(self, analysis_results): self.results = analysis_results
def generate_strategic_insights(self):
"""توليد رؤى استراتيجية من تحليل التفاعلات"""
insights = []
for scenario_name, results in self.results.items():
analysis = results['analysis']
# متوسط التنافس والتعاون
avg_competition = np.mean(analysis['competition_index'])
avg_cooperation = np.mean(analysis['cooperation_index'])
if avg_competition > avg_cooperation:
insight = {
'scenario': scenario_name,
'issue': 'هيمنة التنافس',
'recommendation': 'تعزيز آليات التعاون عبر حوافز إيجابية',
'priority': 'عالية',
'expected_impact': 'كبير'
}
else:
insight = {
'scenario': scenario_name,
'issue': 'توازن تعاوني',
'recommendation': 'الحفاظ على الاستقرار الحالي',
'priority': 'متوسطة',
'expected_impact': 'متوسط'
}
insights.append(insight)
return insights
def predict_system_evolution(self, current_state, horizon=50):
"""التنبؤ بتطور النظام بناءً على الديناميكيات الحالية"""
# تحليل الاتجاهات الحالية
current_metrics = self.calculate_current_metrics(current_state)
# تنبؤ بسيط بناءً على الاتجاهات
predictions = {
'short_term': self.predict_short_term(current_metrics, horizon//2),
'long_term': self.predict_long_term(current_metrics, horizon)
}
return predictions
recommendation_engine = StrategicRecommendations(comprehensive_results) strategic_insights = recommendation_engine.generate_strategic_insights()
print("الرؤى الاستراتيجية المستخلصة:") for insight in strategic_insights: print(f"\nالسيناريو: {insight['scenario']}") print(f"المشكلة: {insight['issue']}") print(f"التوصية: {insight['recommendation']}") print(f"الأولوية: {insight['priority']}") import numpy as np from scipy import signal from scipy.fft import fft, fftfreq from sklearn.ensemble import IsolationForest import ruptures as rpt
class PhaseTransitionAnalyzer: def init(self, system_history, time_series): self.history = system_history # التاريخ الكامل للنظام self.time_series = time_series # السلاسل الزمنية للمتغيرات self.critical_points = [] self.phase_regimes = []
def multi_scale_entropy_analysis(self, data, scale_factor=10):
"""تحليل الإنتروبيا متعددة المقاييس لاكتشاف التعقيد"""
scales = range(1, scale_factor + 1)
entropy_values = []
for scale in scales:
# تقليل دقة البيانات
scaled_data = self.coarse_grain_data(data, scale)
entropy = self.calculate_sample_entropy(scaled_data)
entropy_values.append(entropy)
return {
'scales': list(scales),
'entropy_values': entropy_values,
'complexity_index': np.trapz(entropy_values)
}
def coarse_grain_data(self, data, scale):
"""تقسيم البيانات إلى مقاييس خشنة"""
n = len(data) // scale
coarse = np.zeros(n)
for i in range(n):
coarse[i] = np.mean(data[i*scale:(i+1)*scale])
return coarse
def calculate_sample_entropy(self, data, m=2, r=0.2):
"""حساب إنتروبيا العينة (Sample Entropy)"""
n = len(data)
std_data = np.std(data)
if std_data == 0:
return 0
r_val = r * std_data
def _maxdist(x_i, x_j):
return max([abs(ua - va) for ua, va in zip(x_i, x_j)])
def _phi(m):
x = [[data[j] for j in range(i, i + m)] for i in range(n - m + 1)]
C = 0
for i in range(len(x)):
for j in range(len(x)):
if i != j and _maxdist(x[i], x[j]) <= r_val:
C += 1
return C / ((n - m) * (n - m + 1))
if n - m + 1 <= 0:
return 0
return -np.log(_phi(m + 1) / _phi(m)) if _phi(m) != 0 else 0
def detect_critical_slowing_down(self, data, window_size=50):
"""كشف التباطؤ الحرج (إشارة مبكرة للتحول الطوري)"""
n = len(data)
variance = np.zeros(n - window_size)
autocorrelation = np.zeros(n - window_size)
for i in range(n - window_size):
window = data[i:i + window_size]
variance[i] = np.var(window)
# حساب ارتباط ذاتي من الرتبة 1
if len(window) > 1:
autocorrelation[i] = np.corrcoef(window[:-1], window[1:])[0, 1]
else:
autocorrelation[i] = 0
return {
'variance_trend': variance,
'autocorrelation_trend': autocorrelation,
'critical_slowing_signals': variance > np.percentile(variance, 75)
}
def fisher_information_analysis(self, data, window_size=30):
"""تحليل معلومات فيشر لاكتشاف التحولات الطورية"""
n = len(data)
fisher_info = np.zeros(n - window_size)
for i in range(n - window_size):
window = data[i:i + window_size]
# تقدير كثافة الاحتمال (مبسط)
hist, bin_edges = np.histogram(window, bins=10, density=True)
pdf = hist / np.sum(hist)
# حساب مشتق كثافة الاحتمال
pdf_gradient = np.gradient(pdf)
# معلومات فيشر
with np.errstate(divide='ignore', invalid='ignore'):
fi = np.sum((pdf_gradient ** 2) / pdf)
fisher_info[i] = fi if not np.isnan(fi) and not np.isinf(fi) else 0
return fisher_info
def wavelet_analysis(self, data, scales=np.arange(1, 50)):
"""تحويل المويجات لاكتشاف الأنماط متعددة المقاييس"""
# تنفيذ مبسط لتحويل المويجات المتقطع
n = len(data)
wavelet_coeffs = np.zeros((len(scales), n))
for i, scale in enumerate(scales):
# مويجة مكسيكية الوشاح (Mexican Hat) مبسطة
wavelet = self.mexican_hat_wavelet(scale, n)
wavelet_coeffs[i] = np.convolve(data, wavelet, mode='same')
return {
'scales': scales,
'wavelet_coeffs': wavelet_coeffs,
'energy_spectrum': np.sum(wavelet_coeffs**2, axis=1)
}
def mexican_hat_wavelet(self, scale, n):
"""مويجة مكسيكية الوشاح"""
t = np.linspace(-5, 5, n)
return (2/(np.sqrt(3*scale)*np.pi**0.25)) * (1 - (t/scale)**2) * np.exp(-t**2/(2*scale**2))
def comprehensive_phase_analysis(self):
"""تحليل طوري شامل يجمع بين الطرق المتعددة"""
K, N, T = self.history.shape
critical_points_all = []
phase_regimes = []
# تحليل لكل نوع نية
for k in range(K):
# متوسط عبر العقد
intent_series = np.mean(self.history[k], axis=0)
# 1. تحليل الإنتروبيا متعددة المقاييس
entropy_analysis = self.multi_scale_entropy_analysis(intent_series)
# 2. كشف التباطؤ الحرج
slowing_analysis = self.detect_critical_slowing_down(intent_series)
# 3. تحليل معلومات فيشر
fisher_info = self.fisher_information_analysis(intent_series)
# 4. تحويل المويجات
wavelet_analysis = self.wavelet_analysis(intent_series)
# دمج النتائج لاكتشاف النقاط الحرجة
critical_points = self.fusion_critical_point_detection(
entropy_analysis, slowing_analysis, fisher_info, wavelet_analysis
)
critical_points_all.extend([(k, t) for t in critical_points])
# تحديد الأنظمة الطورية
regimes = self.identify_phase_regimes(intent_series, critical_points)
phase_regimes.append(regimes)
self.critical_points = critical_points_all
self.phase_regimes = phase_regimes
return {
'critical_points': critical_points_all,
'phase_regimes': phase_regimes,
'transition_probabilities': self.calculate_transition_probabilities(phase_regimes)
}
def fusion_critical_point_detection(self, entropy_analysis, slowing_analysis, fisher_info, wavelet_analysis):
"""دمج كواشف متعددة لاكتشاف النقاط الحرجة"""
# تطبيق خوارزمية كشف التغيرات
algorithm = rpt.Pelt(model="rbf").fit(np.array(entropy_analysis['entropy_values']).reshape(-1, 1))
change_points = algorithm.predict(pen=10)
# تصفية النقاط ذات الأهمية الإحصائية
significant_changes = []
for cp in change_points:
if cp < len(fisher_info):
if fisher_info[cp] > np.percentile(fisher_info, 75):
significant_changes.append(cp)
return significant_changes
def identify_phase_regimes(self, data, critical_points):
"""تحديد الأنظمة الطورية بناءً على النقاط الحرجة"""
regimes = []
start = 0
for cp in sorted(critical_points):
if cp > start:
regime_data = data[start:cp]
regime_type = self.classify_regime(regime_data)
regimes.append({
'start': start,
'end': cp,
'type': regime_type,
'stability': np.std(regime_data),
'mean_value': np.mean(regime_data)
})
start = cp
# النظام الأخير
if start < len(data):
regime_data = data[start:]
regime_type = self.classify_regime(regime_data)
regimes.append({
'start': start,
'end': len(data),
'type': regime_type,
'stability': np.std(regime_data),
'mean_value': np.mean(regime_data)
})
return regimes
def classify_regime(self, data):
"""تصنيف النظام الطوري"""
std = np.std(data)
mean = np.mean(data)
if std < 0.1:
return "مستقر"
elif std > 0.3:
return "فوضوي"
elif np.mean(np.abs(np.diff(data))) > 0.05:
return "تذبذبي"
else:
return "انتقالي"
def calculate_transition_probabilities(self, phase_regimes):
"""حساب احتمالات الانتقال بين الأنظمة الطورية"""
transitions = {}
for k in range(len(phase_regimes)):
regimes = phase_regimes[k]
for i in range(len(regimes) - 1):
from_type = regimes[i]['type']
to_type = regimes[i + 1]['type']
key = (from_type, to_type)
transitions[key] = transitions.get(key, 0) + 1
# تحويل إلى احتمالات
total_transitions = sum(transitions.values())
probabilities = {k: v / total_transitions for k, v in transitions.items()}
return probabilities
import torch
import torch.nn as nn from torch.utils.data import Dataset, DataLoader import warnings warnings.filterwarnings('ignore')
class SystemEvolutionPredictor: def init(self, input_dim, prediction_horizon, hidden_dim=128): self.input_dim = input_dim self.prediction_horizon = prediction_horizon self.hidden_dim = hidden_dim self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# نموذج LSTM للتنبؤ بالسلاسل الزمنية
self.model = TemporalFusionTransformer(input_dim, hidden_dim, prediction_horizon)
self.model.to(self.device)
def prepare_training_data(self, historical_data, window_size=100):
"""تحضير بيانات التدريب للنموذج"""
sequences = []
targets = []
for i in range(len(historical_data) - window_size - self.prediction_horizon):
seq = historical_data[i:i + window_size]
target = historical_data[i + window_size:i + window_size + self.prediction_horizon]
sequences.append(seq)
targets.append(target)
return np.array(sequences), np.array(targets)
class TemporalFusionTransformer(nn.Module): """محول الاندماج الزمني للتنبؤ متعدد الخطوات""" def init(self, input_dim, hidden_dim, output_horizon, num_heads=8, num_layers=3): super().init() self.input_dim = input_dim self.hidden_dim = hidden_dim self.output_horizon = output_horizon self.num_heads = num_heads
# طبقة الترميز
self.encoder = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True, dropout=0.2)
# آلة الانتباه
self.attention = nn.MultiheadAttention(hidden_dim, num_heads, dropout=0.1)
# طبقات كاملة الاتصال
self.decoder = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim * 2),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(hidden_dim * 2, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, output_horizon * input_dim)
)
def forward(self, x):
# x shape: (batch, sequence, features)
batch_size, seq_len, _ = x.shape
# الترميز
encoded, (hidden, cell) = self.encoder(x)
# آلة الانتباه
attended, _ = self.attention(encoded, encoded, encoded)
# استخدام آخر حالة مخفية للتنبؤ
last_hidden = attended[:, -1, :]
# فك الترميز
output = self.decoder(last_hidden)
output = output.view(batch_size, self.output_horizon, self.input_dim)
return output
class HybridPredictor: """تنبؤ هجين يجمع بين النماذج الفيزيائية والتعلم الآلي""" def init(self, physical_model, ml_model): self.physical_model = physical_model self.ml_model = ml_model self.uncertainty_estimator = UncertaintyEstimator()
def predict_system_evolution(self, current_state, horizon=50, num_samples=1000):
"""التنبؤ بتطور النظام مع تقدير عدم اليقين"""
# التنبؤ بالنموذج الفيزيائي
physical_pred = self.physical_model.simulate_interactions(
T=horizon, initial_conditions=current_state
)
# التنبؤ بنموذج التعلم الآلي
ml_input = self.prepare_ml_input(current_state)
ml_pred = self.ml_model.predict(ml_input)
# دمج التنبؤات
combined_pred = self.fusion_predictions(physical_pred, ml_pred)
# تقدير عدم اليقين
uncertainty = self.uncertainty_estimator.estimate_uncertainty(
combined_pred, num_samples
)
return {
'point_prediction': combined_pred,
'uncertainty_intervals': uncertainty,
'physical_component': physical_pred,
'ml_component': ml_pred,
'confidence_score': self.calculate_confidence(uncertainty)
}
def fusion_predictions(self, physical_pred, ml_pred):
"""دمج تنبؤات النماذج المختلفة"""
# وزن ديناميكي بناءً على أداء النماذج التاريخي
physical_weight = 0.6
ml_weight = 0.4
return physical_weight * physical_pred + ml_weight * ml_pred
def calculate_confidence(self, uncertainty):
"""حساب درجة الثقة في التنبؤ"""
avg_uncertainty = np.mean([u['width'] for u in uncertainty])
return max(0, 1 - avg_uncertainty)
class UncertaintyEstimator: """تقدير عدم اليقين في التنبؤات باستخدام طرق Bayesian""" def init(self): self.methods = ['bootstrap', 'mc_dropout', 'ensemble']
def estimate_uncertainty(self, predictions, num_samples):
"""تقدير فترات عدم اليقين"""
uncertainties = []
for method in self.methods:
if method == 'bootstrap':
uncertainty = self.bootstrap_uncertainty(predictions, num_samples)
elif method == 'mc_dropout':
uncertainty = self.monte_carlo_uncertainty(predictions, num_samples)
else:
uncertainty = self.ensemble_uncertainty(predictions, num_samples)
uncertainties.append(uncertainty)
# دجم تقديرات عدم اليقين
return self.combine_uncertainties(uncertainties)
def bootstrap_uncertainty(self, predictions, num_samples):
"""عدم اليقين باستخدام Bootstrap"""
n = len(predictions)
bootstrap_samples = []
for _ in range(num_samples):
# عينة Bootstrap
indices = np.random.choice(n, n, replace=True)
sample = predictions[indices]
bootstrap_samples.append(np.mean(sample, axis=0))
bootstrap_samples = np.array(bootstrap_samples)
return {
'mean': np.mean(bootstrap_samples, axis=0),
'std': np.std(bootstrap_samples, axis=0),
'lower': np.percentile(bootstrap_samples, 2.5, axis=0),
'upper': np.percentile(bootstrap_samples, 97.5, axis=0),
'width': np.mean(np.percentile(bootstrap_samples, 97.5, axis=0) -
np.percentile(bootstrap_samples, 2.5, axis=0))
}
class EarlyWarningSystem:
"""نظام إنذار مبكر للتحولات الطورية الحرجة"""
def __init__(self, analyzer, predictor, warning_threshold=0.8):
self.analyzer = analyzer
self.predictor = predictor
self.warning_threshold = warning_threshold
self.warning_history = []
def monitor_system_health(self, current_state, history_window=100):
"""مراقبة صحة النظام واكتشاف إشارات الخطر المبكرة"""
# تحليل المؤشرات المبكرة
early_indicators = self.calculate_early_indicators(current_state, history_window)
# حساب مؤشر الخطر
risk_score = self.calculate_risk_score(early_indicators)
# التحقق من وجود إشارات إنذار
warnings = self.check_warning_signals(early_indicators, risk_score)
# تحديث سجل الإنذارات
self.warning_history.append({
'timestamp': len(self.warning_history),
'risk_score': risk_score,
'warnings': warnings,
'indicators': early_indicators
})
return {
'risk_score': risk_score,
'warnings': warnings,
'recommended_actions': self.generate_recommendations(warnings, risk_score)
}
def calculate_early_indicators(self, current_state, history_window):
"""حساب المؤشرات المبكرة للتحولات الطورية"""
indicators = {}
# 1. التباطؤ الحرج
slowing_data = np.mean(current_state, axis=1)[-history_window:]
slowing_analysis = self.analyzer.detect_critical_slowing_down(slowing_data)
indicators['critical_slowing'] = np.mean(slowing_analysis['variance_trend'])
# 2. تعقيد النظام
complexity_analysis = self.analyzer.multi_scale_entropy_analysis(slowing_data)
indicators['complexity'] = complexity_analysis['complexity_index']
# 3. معلومات فيشر
fisher_info = self.analyzer.fisher_information_analysis(slowing_data)
indicators['fisher_info'] = np.mean(fisher_info)
# 4. التذبذبية
spectral_density = self.spectral_analysis(slowing_data)
indicators['oscillation_power'] = np.sum(spectral_density['power'][:10]) # الترددات المنخفضة
return indicators
def spectral_analysis(self, data):
"""التحليل الطيفي للبيانات"""
n = len(data)
freqs = fftfreq(n)
fft_vals = fft(data)
power = np.abs(fft_vals) ** 2
return {
'frequencies': freqs[:n//2],
'power': power[:n//2]
}
def calculate_risk_score(self, indicators):
"""حساب درجة الخطر الكلية"""
weights = {
'critical_slowing': 0.3,
'complexity': 0.25,
'fisher_info': 0.25,
'oscillation_power': 0.2
}
# تطبيع المؤشرات
normalized_indicators = {}
for key, value in indicators.items():
# افتراض أن القيم العالية تشير إلى خطر أعلى
normalized_indicators[key] = min(value / 10, 1.0) # تطبيع إلى [0, 1]
# حساب درجة الخطر الموزونة
risk_score = 0
for key, weight in weights.items():
risk_score += normalized_indicators[key] * weight
return min(risk_score, 1.0)
def check_warning_signals(self, indicators, risk_score):
"""التحقق من إشارات الإنذار"""
warnings = []
if risk_score > self.warning_threshold:
warnings.append("خطر تحول طوري عالي")
if indicators['critical_slowing'] > 0.7:
warnings.append("إشارات تباطؤ حرج قوية")
if indicators['fisher_info'] > 0.8:
warnings.append("تغيرات سريعة في بنية النظام")
if indicators['oscillation_power'] > 0.6:
warnings.append("تذبذبية عالية قد تشير إلى عدم استقرار")
return warnings
def generate_recommendations(self, warnings, risk_score):
"""توليد توصيات بناءً على الإنذارات"""
recommendations = []
if risk_score > 0.8:
recommendations.append("تفعيل خطط الطوارئ والاستجابة السريعة")
recommendations.append("مراقبة النظام بشكل مكثف ومستمر")
if "إشارات تباطؤ حرج قوية" in warnings:
recommendations.append("تقليل الضغوط على النظام")
recommendations.append("تعزيز آليات الاستقرار الذاتي")
if "تغيرات سريعة في بنية النظام" in warnings:
recommendations.append("إعادة تقييم استراتيجيات التدخل")
recommendations.append("تحضير بدائل تشغيلية")
if not recommendations:
recommendations.append("لا حاجة لإجراءات فورية - متابعة المراقبة")
return recommendations
def generate_risk_report(self, lookback_period=50):
"""تقرير مفصل عن مخاطر النظام"""
recent_warnings = self.warning_history[-lookback_period:]
if not recent_warnings:
return {"risk_level": "منخفض", "trend": "مستقر"}
risk_scores = [w['risk_score'] for w in recent_warnings]
avg_risk = np.mean(risk_scores)
risk_trend = np.polyfit(range(len(risk_scores)), risk_scores, 1)[0]
risk_level = "مرتفع" if avg_risk > 0.7 else "متوسط" if avg_risk > 0.4 else "منخفض"
trend_direction = "تصاعدي" if risk_trend > 0.01 else "تنازلي" if risk_trend < -0.01 else "مستقر"
return {
'risk_level': risk_level,
'average_risk_score': avg_risk,
'trend': trend_direction,
'trend_strength': abs(risk_trend),
'recent_warnings': len([w for w in recent_warnings if w['risk_score'] > 0.6]),
'recommendations': self.generate_strategic_recommendations(risk_level, trend_direction)
}
class ComprehensiveSystemMonitor:
"""نظام مراقبة وتنبؤ شامل"""
def __init__(self, social_model):
self.model = social_model
self.analyzer = PhaseTransitionAnalyzer([], [])
self.predictor = SystemEvolutionPredictor(
input_dim=social_model.K,
prediction_horizon=30
)
self.early_warning = EarlyWarningSystem(self.analyzer, self.predictor)
self.monitoring_history = []
def run_comprehensive_analysis(self, T=200, analysis_interval=10):
"""تشغيل تحليل شامل مع مراقبة مستمرة"""
print("بدء التحليل الشامل للنظام...")
# محاكاة النظام
history = self.model.simulate_interactions(T=T)
K, N, T_total = history.shape
results = {
'critical_points': [],
'phase_regimes': [],
'predictions': [],
'warnings': [],
'risk_assessments': []
}
# تحليل عند فترات زمنية منتظمة
for t in range(0, T_total, analysis_interval):
print(f"تحليل الزمن {t}/{T_total}")
# البيانات الحالية
current_state = history[:, :, max(0, t-100):t]
if current_state.shape[2] > 50: # تأكد من وجود بيانات كافية
# تحديث المحلل
self.analyzer.history = current_state
self.analyzer.time_series = np.mean(current_state, axis=1)
# 1. تحليل التحولات الطورية
phase_analysis = self.analyzer.comprehensive_phase_analysis()
results['critical_points'].extend(phase_analysis['critical_points'])
results['phase_regimes'].append(phase_analysis['phase_regimes'])
# 2. التنبؤ بالتطور
current_flat = current_state[:, :, -1]
prediction = self.predict_system_future(current_flat, horizon=30)
results['predictions'].append(prediction)
# 3. الإنذار المبكر
warning_info = self.early_warning.monitor_system_health(current_state)
results['warnings'].append(warning_info)
# 4. تقييم المخاطر
risk_report = self.early_warning.generate_risk_report()
results['risk_assessments'].append(risk_report)
self.monitoring_history = results
return results
def predict_system_future(self, current_state, horizon=30):
"""التنبؤ بمستقبل النظام"""
# تحضير البيانات للتنبؤ
current_series = np.mean(current_state, axis=1) # متوسط عبر العقد
# تنبؤ بسيط (في التطبيق الحقيقي يستخدم النموذج الهجين)
future_trend = self.simple_trend_projection(current_series, horizon)
return {
'time_horizon': horizon,
'predicted_trajectory': future_trend,
'confidence_interval': self.calculate_confidence_interval(future_trend),
'likely_scenarios': self.generate_scenarios(future_trend)
}
def simple_trend_projection(self, data, horizon):
"""إسقاط اتجاه بسيط"""
n = len(data)
projections = []
for i in range(len(data)):
# انحدار خطي للاتجاه الأخير
if n > 10:
x = np.arange(max(0, n-10), n)
y = data[i, max(0, n-10):]
if len(x) == len(y) and len(x) > 1:
slope = np.polyfit(x, y, 1)[0]
projection = data[i, -1] + slope * np.arange(1, horizon + 1)
projections.append(projection)
else:
projections.append(np.full(horizon, data[i, -1]))
else:
projections.append(np.full(horizon, data[i, -1]))
return np.array(projections)
def generate_scenarios(self, base_prediction):
"""توليد سيناريوهات محتملة"""
scenarios = {
'optimistic': base_prediction * 1.2, # تحسن 20%
'pessimistic': base_prediction * 0.8, # تدهور 20%
'volatile': base_prediction * (1 + 0.3 * np.random.randn(*base_prediction.shape))
}
return scenarios
def create_executive_summary(self):
"""إنشاء ملخص تنفيذي للقرارات"""
if not self.monitoring_history:
return "لا توجد بيانات كافية للتحليل"
latest_risk = self.monitoring_history['risk_assessments'][-1] if self.monitoring_history['risk_assessments'] else {}
critical_points_count = len(self.monitoring_history['critical_points'])
summary = {
'overall_risk_level': latest_risk.get('risk_level', 'غير محدد'),
'critical_points_detected': critical_points_count,
'system_stability': 'عالية' if latest_risk.get('average_risk_score', 1) < 0.3 else 'متوسطة' if latest_risk.get('average_risk_score', 1) < 0.6 else 'منخفضة',
'key_recommendations': latest_risk.get('recommendations', []),
'monitoring_period': len(self.monitoring_history['warnings']),
'prediction_confidence': np.mean([p.get('confidence_interval', 0.5) for p in self.monitoring_history['predictions']])
}
return summary
def run_comprehensive_monitoring(): # إنشاء النموذج الاجتماعي social_model = CompetitiveCooperativeIntentModel(num_intent_types=4, network_size=100) G = nx.watts_strogatz_graph(100, 8, 0.3) social_model.set_network_topology(G)
# إنشاء نظام المراقبة الشامل
monitor = ComprehensiveSystemMonitor(social_model)
# تشغيل التحليل
results = monitor.run_comprehensive_analysis(T=300, analysis_interval=20)
# إنشاء التقارير
executive_summary = monitor.create_executive_summary()
print("=" * 50)
print("الملخص التنفيذي للرصد الشامل")
print("=" * 50)
for key, value in executive_summary.items():
print(f"{key}: {value}")
return monitor, results
monitor, results = run_comprehensive_monitoring() class InteractiveMonitoringDashboard: """لوحة تحكم تفاعلية لمراقبة النظام""" def init(self, monitor): self.monitor = monitor self.fig = None
def create_real_time_dashboard(self):
"""إنشاء لوحة تحكم في الوقت الحقيقي"""
import plotly.graph_objects as go
from plotly.subplots import make_subplots
fig = make_subplots(
rows=3, cols=2,
subplot_titles=[
'تطور النوايا عبر الزمن',
'النقاط الحرجة المكتشفة',
'درجة الخطر والإنذارات',
'التنبؤ بالتطور المستقبلي',
'تحليل الأنظمة الطورية',
'مؤشرات الإنذار المبكر'
],
specs=[
[{"colspan": 2}, None],
[{}, {}],
[{}, {}]
],
vertical_spacing=0.08,
horizontal_spacing=0.08
)
# إضافة الرسوم البيانية
self._add_intent_evolution_plot(fig)
self._add_critical_points_plot(fig)
self._add_risk_monitoring_plot(fig)
self._add_prediction_plot(fig)
self._add_phase_analysis_plot(fig)
self._add_early_warning_plot(fig)
fig.update_layout(
height=1200,
title_text="لوحة المراقبة الشاملة للتحولات الطورية والتنبؤ",
showlegend=True
)
self.fig = fig
return fig
def _add_intent_evolution_plot(self, fig):
"""إضافة رسم تطور النوايا"""
history = self.monitor.model.history
time = range(history.shape[2])
for k in range(history.shape[0]):
intent_series = np.mean(history[k], axis=0)
fig.add_trace(
go.Scatter(x=time, y=intent_series,
name=f'نية {self.monitor.model.intent_types[k]}',
line=dict(width=2)),
row=1, col=1
)
import dash
from dash import dcc, html, Input, Output, callback import plotly.graph_objects as go from plotly.subplots import make_subplots import dash_bootstrap_components as dbc import numpy as np import pandas as pd from datetime import datetime, timedelta
class RealTimeMonitoringDashboard: """لوحة مراقبة شاملة في الوقت الحقيقي"""
def __init__(self, system_monitor):
self.monitor = system_monitor
self.app = dash.Dash(__name__, external_stylesheets=[dbc.themes.DARKLY])
self.setup_layout()
self.setup_callbacks()
# بيانات محاكاة للعرض
self.sample_data = self.generate_sample_data()
def setup_layout(self):
"""إعداد تخطيط لوحة التحكم"""
self.app.layout = dbc.Container([
# رأس اللوحة
dbc.Row([
dbc.Col([
html.H1("🏢 لوحة المراقبة الشاملة للتحولات الطورية",
className="text-center mb-4",
style={'color': 'white', 'fontWeight': 'bold'})
], width=12)
]),
# مؤشرات الأداء الرئيسية
dbc.Row([
dbc.Col([self.create_kpi_card("مستوى الخطر الحالي", "0.32", "success")], width=3),
dbc.Col([self.create_kpi_card("النقاط الحرجة", "3", "warning")], width=3),
dbc.Col([self.create_kpi_card("التنبؤ بالثقة", "87%", "info")], width=3),
dbc.Col([self.create_kpi_card("الإنذارات النشطة", "1", "danger")], width=3),
], className="mb-4"),
# الصف الأول من الرسوم البيانية
dbc.Row([
dbc.Col([
dcc.Graph(id='intent-evolution-plot'),
dcc.Interval(id='interval-update', interval=2000, n_intervals=0)
], width=8),
dbc.Col([
dcc.Graph(id='risk-gauge-plot'),
dcc.Graph(id='warning-indicators-plot')
], width=4)
], className="mb-4"),
# الصف الثاني من الرسوم البيانية
dbc.Row([
dbc.Col([
dcc.Graph(id='phase-transition-plot')
], width=6),
dbc.Col([
dcc.Graph(id='prediction-comparison-plot')
], width=6)
], className="mb-4"),
# الصف الثالث: تحليلات متقدمة
dbc.Row([
dbc.Col([
dcc.Graph(id='early-warning-plot')
], width=12)
]),
# لوحة التحكم
dbc.Row([
dbc.Col([
self.create_control_panel()
], width=12)
], className="mb-4")
], fluid=True)
def create_kpi_card(self, title, value, color):
"""إنشاء بطاقة مؤشر أداء رئيسي"""
colors = {
'success': '#00cc96',
'warning': '#ffa15a',
'danger': '#ef553b',
'info': '#636efa'
}
return dbc.Card([
dbc.CardBody([
html.H4(title, className="card-title", style={'color': 'white'}),
html.H2(value, className="card-text",
style={'color': colors.get(color, 'white'), 'fontWeight': 'bold'})
])
], color="dark", outline=True)
def create_control_panel(self):
"""إنشاء لوحة تحكم تفاعلية"""
return dbc.Card([
dbc.CardHeader("🎛️ لوحة التحكم", style={'color': 'white', 'fontWeight': 'bold'}),
dbc.CardBody([
dbc.Row([
dbc.Col([
html.Label("معدل التحديث:", style={'color': 'white'}),
dcc.Dropdown(
id='update-interval',
options=[
{'label': 'كل ثانيتين', 'value': 2000},
{'label': 'كل 5 ثوان', 'value': 5000},
{'label': 'كل 10 ثوان', 'value': 10000}
],
value=2000,
style={'color': 'black'}
)
], width=3),
dbc.Col([
html.Label("نافذة التنبؤ:", style={'color': 'white'}),
dcc.Slider(
id='prediction-horizon',
min=10, max=100, step=10, value=30,
marks={i: str(i) for i in range(10, 101, 10)}
)
], width=6),
dbc.Col([
html.Label("حساسية الإنذار:", style={'color': 'white'}),
dcc.Slider(
id='warning-sensitivity',
min=0.1, max=1.0, step=0.1, value=0.7,
marks={0.1: 'منخفضة', 0.5: 'متوسطة', 1.0: 'عالية'}
)
], width=3)
]),
dbc.Row([
dbc.Col([
dbc.Button("🔄 تحديث البيانات", id='refresh-button',
color="primary", className="mt-3")
], width=3),
dbc.Col([
dbc.Button("📊 توليد تقرير", id='report-button',
color="success", className="mt-3")
], width=3),
dbc.Col([
dbc.Button("⚠️ اختبار إنذار", id='test-alert-button',
color="warning", className="mt-3")
], width=3),
dbc.Col([
dbc.Button("🛑 إيقاف الطوارئ", id='emergency-button',
color="danger", className="mt-3")
], width=3)
])
])
], color="dark")
def setup_callbacks(self):
"""إعداد callback functions للتحديث التفاعلي"""
@self.app.callback(
[Output('intent-evolution-plot', 'figure'),
Output('risk-gauge-plot', 'figure'),
Output('warning-indicators-plot', 'figure'),
Output('phase-transition-plot', 'figure'),
Output('prediction-comparison-plot', 'figure'),
Output('early-warning-plot', 'figure')],
[Input('interval-update', 'n_intervals'),
Input('refresh-button', 'n_clicks')]
)
def update_dashboard(n_intervals, n_clicks):
"""تحديث جميع الرسوم البيانية"""
# في التطبيق الحقيقي، يتم استبدال هذا ببيانات حية من النظام
updated_data = self.generate_updated_data()
fig1 = self.create_intent_evolution_plot(updated_data)
fig2 = self.create_risk_gauge(updated_data)
fig3 = self.create_warning_indicators(updated_data)
fig4 = self.create_phase_transition_plot(updated_data)
fig5 = self.create_prediction_comparison(updated_data)
fig6 = self.create_early_warning_plot(updated_data)
return fig1, fig2, fig3, fig4, fig5, fig6
@self.app.callback(
Output('interval-update', 'interval'),
[Input('update-interval', 'value')]
)
def update_interval_rate(interval):
"""تحديث معدل التحديث"""
return interval
def create_intent_evolution_plot(self, data):
"""رسم تطور النوايا عبر الزمن"""
fig = make_subplots(rows=2, cols=1, vertical_spacing=0.1,
subplot_titles=['تطور أنواع النوايا', 'المؤشرات المركبة'])
# أنواع النوايا
intent_types = ['إيجابي', 'سلبي', 'محايد', 'استفساري']
time_points = data['time_points']
for i, intent in enumerate(intent_types):
fig.add_trace(
go.Scatter(x=time_points, y=data['intents'][i],
name=intent, line=dict(width=2)),
row=1, col=1
)
# المؤشرات المركبة
fig.add_trace(
go.Scatter(x=time_points, y=data['cooperation_index'],
name='مؤشر التعاون', line=dict(color='green', width=2)),
row=2, col=1
)
fig.add_trace(
go.Scatter(x=time_points, y=data['competition_index'],
name='مؤشر التنافس', line=dict(color='red', width=2)),
row=2, col=1
)
fig.update_layout(
height=400,
title_text="التطور الزمني للنوايا والتفاعلات",
paper_bgcolor='rgba(0,0,0,0)',
plot_bgcolor='rgba(0,0,0,0)',
font=dict(color='white'),
showlegend=True
)
return fig
def create_risk_gauge(self, data):
"""مقياس خطر تفاعلي"""
current_risk = data['current_risk']
fig = go.Figure(go.Indicator(
mode = "gauge+number+delta",
value = current_risk,
domain = {'x': [0, 1], 'y': [0, 1]},
title = {'text': "مستوى الخطر الحالي", 'font': {'color': 'white', 'size': 20}},
delta = {'reference': 0.3, 'increasing': {'color': "red"}},
gauge = {
'axis': {'range': [0, 1], 'tickwidth': 1, 'tickcolor': "white"},
'bar': {'color': "darkblue"},
'bgcolor': "black",
'borderwidth': 2,
'bordercolor': "gray",
'steps': [
{'range': [0, 0.3], 'color': 'green'},
{'range': [0.3, 0.7], 'color': 'yellow'},
{'range': [0.7, 1], 'color': 'red'}],
'threshold': {
'line': {'color': "white", 'width': 4},
'thickness': 0.75,
'value': current_risk}}))
fig.update_layout(
height=300,
paper_bgcolor='rgba(0,0,0,0)',
font={'color': "white", 'family': "Arial"}
)
return fig
def create_warning_indicators(self, data):
"""مؤشرات الإنذار المبكر"""
indicators = data['warning_indicators']
fig = go.Figure()
fig.add_trace(go.Indicator(
mode = "gauge+number",
value = indicators['critical_slowing'],
title = {'text': "التباطؤ الحرج", 'font': {'color': 'white'}},
domain = {'row': 0, 'column': 0},
gauge = {'axis': {'range': [0, 1]},
'bar': {'color': "darkblue"},
'steps': [{'range': [0, 0.7], 'color': "lightgray"},
{'range': [0.7, 1], 'color': "red"}]}
))
fig.add_trace(go.Indicator(
mode = "gauge+number",
value = indicators['fisher_info'],
title = {'text': "معلومات فيشر", 'font': {'color': 'white'}},
domain = {'row': 0, 'column': 1},
gauge = {'axis': {'range': [0, 1]},
'bar': {'color': "darkblue"},
'steps': [{'range': [0, 0.6], 'color': "lightgray"},
{'range': [0.6, 1], 'color': "orange"}]}
))
fig.update_layout(
grid = {'rows': 1, 'columns': 2, 'pattern': "independent"},
height=300,
paper_bgcolor='rgba(0,0,0,0)',
font={'color': "white"}
)
return fig
def create_phase_transition_plot(self, data):
"""رسم التحولات الطورية والنقاط الحرجة"""
fig = go.Figure()
# إضافة المناطق الطورية
phases = data['phase_regimes']
colors = {'مستقر': 'green', 'انتقالي': 'yellow', 'فوضوي': 'red', 'تذبذبي': 'orange'}
for phase in phases:
fig.add_vrect(
x0=phase['start'], x1=phase['end'],
fillcolor=colors[phase['type']], opacity=0.2,
layer="below", line_width=0,
annotation_text=phase['type'],
annotation_position="top left"
)
# إشارات الإنذار المبكر
warning_signals = data['early_warning_signals']
fig.add_trace(
go.Scatter(x=warning_signals['time'], y=warning_signals['intensity'],
mode='markers', name='إشارات إنذار',
marker=dict(size=10, color='red', symbol='triangle-up'))
)
# النقاط الحرجة
critical_points = data['critical_points']
fig.add_trace(
go.Scatter(x=critical_points, y=[0.5]*len(critical_points),
mode='markers', name='نقاط حرجة',
marker=dict(size=8, color='purple', symbol='diamond'))
)
fig.update_layout(
title="التحولات الطورية والنقاط الحرجة",
xaxis_title="الزمن",
yaxis_title="شدة الإشارة",
height=400,
paper_bgcolor='rgba(0,0,0,0)',
plot_bgcolor='rgba(0,0,0,0)',
font=dict(color='white')
)
return fig
def create_prediction_comparison(self, data):
"""مقارنة تنبؤات النماذج الهجينة"""
fig = go.Figure()
# البيانات التاريخية
history = data['historical_data']
fig.add_trace(
go.Scatter(x=data['time_points'], y=history,
mode='lines', name='بيانات تاريخية',
line=dict(color='blue', width=2))
)
# تنبؤات النموذج الفيزيائي
physical_pred = data['physical_prediction']
fig.add_trace(
go.Scatter(x=data['prediction_time'], y=physical_pred,
mode='lines', name='تنبؤ فيزيائي',
line=dict(color='green', width=2, dash='dash'))
)
# تنبؤات نموذج الذكاء الاصطناعي
ml_pred = data['ml_prediction']
fig.add_trace(
go.Scatter(x=data['prediction_time'], y=ml_pred,
mode='lines', name='تنبؤ ذكاء اصطناعي',
line=dict(color='orange', width=2, dash='dash'))
)
# التنبؤ الهجين
hybrid_pred = data['hybrid_prediction']
fig.add_trace(
go.Scatter(x=data['prediction_time'], y=hybrid_pred,
mode='lines', name='تنبؤ هجين',
line=dict(color='red', width=3))
)
# فترات الثقة
confidence_intervals = data['confidence_intervals']
fig.add_trace(
go.Scatter(x=data['prediction_time'],
y=confidence_intervals['upper'],
mode='lines', line=dict(width=0),
showlegend=False)
)
fig.add_trace(
go.Scatter(x=data['prediction_time'],
y=confidence_intervals['lower'],
mode='lines', line=dict(width=0),
fill='tonexty',
fillcolor='rgba(255,0,0,0.2)',
name='فترة الثقة')
)
fig.update_layout(
title="مقارنة تنبؤات النماذج الهجينة",
xaxis_title="الزمن",
yaxis_title="قيمة المؤشر",
height=400,
paper_bgcolor='rgba(0,0,0,0)',
plot_bgcolor='rgba(0,0,0,0)',
font=dict(color='white')
)
return fig
def create_early_warning_plot(self, data):
"""رسم الإنذار المبكر المتكامل"""
fig = make_subplots(rows=2, cols=1, vertical_spacing=0.1,
subplot_titles=['مؤشرات الإنذار المبكر', 'تحليل المخاطر التراكمي'])
# مؤشرات الإنذار المبكر
warning_data = data['early_warning_analysis']
indicators = ['التباطؤ الحرج', 'تعقيد النظام', 'معلومات فيشر', 'التذبذبية']
for i, indicator in enumerate(indicators):
fig.add_trace(
go.Scatter(x=data['time_points'], y=warning_data[i],
name=indicator, line=dict(width=2)),
row=1, col=1
)
# عتبة الإنذار
fig.add_hline(y=0.7, line_dash="dash", line_color="red",
annotation_text="عتبة الإنذار", row=1, col=1)
# تحليل المخاطر التراكمي
risk_analysis = data['cumulative_risk']
fig.add_trace(
go.Scatter(x=data['time_points'], y=risk_analysis['risk_score'],
name='درجة الخطر', line=dict(color='red', width=3)),
row=2, col=1
)
# مناطق الخطر
fig.add_hrect(y0=0.7, y1=1.0, line_width=0, fillcolor="red", opacity=0.2,
annotation_text="خطر عالي", row=2, col=1)
fig.add_hrect(y0=0.3, y1=0.7, line_width=0, fillcolor="yellow", opacity=0.2,
annotation_text="خطر متوسط", row=2, col=1)
fig.add_hrect(y0=0.0, y1=0.3, line_width=0, fillcolor="green", opacity=0.2,
annotation_text="خطر منخفض", row=2, col=1)
fig.update_layout(
height=500,
title_text="نظام الإنذار المبكر المتكامل",
paper_bgcolor='rgba(0,0,0,0)',
plot_bgcolor='rgba(0,0,0,0)',
font=dict(color='white'),
showlegend=True
)
return fig
def generate_sample_data(self):
"""توليد بيانات نموذجية للعرض"""
time_points = np.linspace(0, 100, 100)
return {
'time_points': time_points,
'intents': [
np.sin(time_points * 0.1) + 0.5 + np.random.normal(0, 0.1, 100), # إيجابي
np.cos(time_points * 0.1) + 0.3 + np.random.normal(0, 0.1, 100), # سلبي
np.sin(time_points * 0.05) + 0.4 + np.random.normal(0, 0.1, 100), # محايد
np.cos(time_points * 0.08) + 0.6 + np.random.normal(0, 0.1, 100) # استفساري
],
'cooperation_index': np.sin(time_points * 0.05) + 0.5,
'competition_index': np.cos(time_points * 0.05) + 0.5,
'current_risk': 0.32,
'warning_indicators': {
'critical_slowing': 0.45,
'fisher_info': 0.68
},
'phase_regimes': [
{'start': 0, 'end': 30, 'type': 'مستقر'},
{'start': 30, 'end': 60, 'type': 'انتقالي'},
{'start': 60, 'end': 80, 'type': 'تذبذبي'},
{'start': 80, 'end': 100, 'type': 'فوضوي'}
],
'early_warning_signals': {
'time': [25, 55, 75],
'intensity': [0.8, 0.9, 0.7]
},
'critical_points': [30, 60, 80],
'historical_data': np.sin(time_points * 0.1) + 0.5 + np.random.normal(0, 0.1, 100),
'prediction_time': np.linspace(100, 130, 30),
'physical_prediction': np.sin(np.linspace(100, 130, 30) * 0.1) + 0.5,
'ml_prediction': np.cos(np.linspace(100, 130, 30) * 0.1) + 0.5,
'hybrid_prediction': 0.6 * np.sin(np.linspace(100, 130, 30) * 0.1) + 0.4 * np.cos(np.linspace(100, 130, 30) * 0.1) + 0.5,
'confidence_intervals': {
'upper': np.sin(np.linspace(100, 130, 30) * 0.1) + 0.7,
'lower': np.sin(np.linspace(100, 130, 30) * 0.1) + 0.3
},
'early_warning_analysis': [
np.sin(time_points * 0.1) + 0.5, # التباطؤ الحرج
np.cos(time_points * 0.1) + 0.5, # تعقيد النظام
np.sin(time_points * 0.05) + 0.5, # معلومات فيشر
np.cos(time_points * 0.08) + 0.5 # التذبذبية
],
'cumulative_risk': {
'risk_score': np.sin(time_points * 0.02) * 0.3 + 0.5
}
}
def generate_updated_data(self):
"""توليد بيانات محدثة (محاكاة للتحديث في الوقت الحقيقي)"""
# في التطبيق الحقيقي، يتم استبدال هذا ببيانات حية من النظام
base_data = self.generate_sample_data()
# إضافة بعض التغيرات العشوائية لمحاكاة التحديث
noise = np.random.normal(0, 0.05, 100)
base_data['intents'] = [intent + noise for intent in base_data['intents']]
base_data['current_risk'] = max(0, min(1, base_data['current_risk'] + np.random.normal(0, 0.02)))
return base_data
def run_dashboard(self, debug=True, port=8050):
"""تشغيل لوحة التحكم"""
print(f"🚀 تشغيل لوحة المراقبة على: http://localhost:{port}")
self.app.run_server(debug=debug, port=port)
def launch_comprehensive_dashboard(): """تشغيل لوحة المراقبة الشاملة"""
# إنشاء نظام مراقبة (محاكاة)
class MockSystemMonitor:
def __init__(self):
self.data = {}
monitor = MockSystemMonitor()
dashboard = RealTimeMonitoringDashboard(monitor)
return dashboard
if name == "main": dashboard = launch_comprehensive_dashboard() dashboard.run_dashboard(debug=True, port=8050) class AdvancedHybridPredictor: """نظام تنبؤ هجين متقدم يجمع بين النماذج الفيزيائية والتعلم العميق"""
def __init__(self, physical_model, ml_models):
self.physical_model = physical_model
self.ml_models = ml_models # قائمة بنماذج تعلم آلي متعددة
self.ensemble_weights = self.initialize_ensemble_weights()
self.performance_history = []
def initialize_ensemble_weights(self):
"""تهيئة الأوزان الأولية للنماذج المجمعة"""
n_models = 1 + len(self.ml_models) # النموذج الفيزيائي + نماذج التعلم الآلي
return np.ones(n_models) / n_models
def dynamic_ensemble_prediction(self, current_state, horizon=30):
"""تنبؤ ديناميكي مجمع مع ضبط الأوزان تلقائياً"""
# التنبؤ بالنموذج الفيزيائي
physical_pred = self.physical_prediction(current_state, horizon)
# التنبؤ بنماذج التعلم الآلي
ml_predictions = []
for model in self.ml_models:
ml_pred = self.ml_prediction(model, current_state, horizon)
ml_predictions.append(ml_pred)
# دمج التنبؤات مع الأوزان الديناميكية
combined_pred = self.combine_predictions(physical_pred, ml_predictions)
# تحديث الأوزان بناءً على الأداء
self.update_ensemble_weights(physical_pred, ml_predictions, current_state)
return {
'physical': physical_pred,
'ml_models': ml_predictions,
'ensemble': combined_pred,
'weights': self.ensemble_weights.copy(),
'confidence': self.calculate_prediction_confidence(combined_pred)
}
def physical_prediction(self, current_state, horizon):
"""التنبؤ باستخدام النموذج الفيزيائي"""
# محاكاة النموذج الفيزيائي
try:
simulation = self.physical_model.simulate_interactions(
T=horizon,
initial_conditions=current_state
)
return np.mean(simulation, axis=(0, 1)) # متوسط عبر الأنواع والعقد
except:
# نموذج بدائي في حالة الخطأ
return np.ones(horizon) * np.mean(current_state)
def ml_prediction(self, model, current_state, horizon):
"""التنبؤ باستخدام نموذج تعلم آلي"""
try:
# تحضير البيانات للإدخال
input_data = self.prepare_ml_input(current_state)
if hasattr(model, 'predict'):
prediction = model.predict(input_data.reshape(1, -1))[0]
else:
# تنبؤ بدائي
prediction = np.ones(horizon) * np.mean(current_state)
return prediction[:horizon] # تأكد من الطول الصحيح
except Exception as e:
print(f"خطأ في تنبؤ التعلم الآلي: {e}")
return np.ones(horizon) * np.mean(current_state)
def combine_predictions(self, physical_pred, ml_predictions):
"""دمج التنبؤات المختلفة"""
all_predictions = [physical_pred] + ml_predictions
weighted_sum = np.zeros_like(physical_pred)
for i, pred in enumerate(all_predictions):
if len(pred) == len(physical_pred):
weighted_sum += self.ensemble_weights[i] * pred
return weighted_sum
def update_ensemble_weights(self, physical_pred, ml_predictions, actual_data):
"""تحديث الأوزان بناءً على دقة التنبؤات السابقة"""
if len(self.performance_history) < 2:
return
# حساب الأخطاء الأخيرة
recent_errors = []
all_predictions = [physical_pred] + ml_predictions
for pred in all_predictions:
if len(self.performance_history) >= len(pred):
actual = self.performance_history[-len(pred):]
error = np.mean((pred - actual) ** 2)
recent_errors.append(error)
if recent_errors:
# تحويل الأخطاء إلى أوزان (خطأ أقل → وزن أعلى)
errors = np.array(recent_errors)
weights = 1.0 / (errors + 1e-8)
weights = weights / np.sum(weights)
# تحديث سلس مع المعلمة السابقة
alpha = 0.1 # معلمة التعلم
self.ensemble_weights = alpha * weights + (1 - alpha) * self.ensemble_weights
def calculate_prediction_confidence(self, prediction):
"""حساب درجة الثقة في التنبؤ"""
# تقييم استقرار التنبؤ
prediction_std = np.std(prediction)
prediction_range = np.ptp(prediction) # peak-to-peak
# ثقة عالية عندما يكون التنبؤ مستقراً ومتسقاً
stability_confidence = 1.0 / (1.0 + prediction_std)
consistency_confidence = 1.0 / (1.0 + prediction_range)
# متوسط درجات الثقة
overall_confidence = (stability_confidence + consistency_confidence) / 2
return min(1.0, overall_confidence)
def uncertainty_quantification(self, predictions, num_samples=1000):
"""تقدير كمي لعدم اليقين في التنبؤات"""
# طريقة Bootstrap لتقدير عدم اليقين
bootstrap_samples = []
for _ in range(num_samples):
# عينة Bootstrap من التنبؤات
sample_idx = np.random.choice(len(predictions), size=len(predictions), replace=True)
bootstrap_sample = predictions[sample_idx]
bootstrap_samples.append(np.mean(bootstrap_sample))
bootstrap_samples = np.array(bootstrap_samples)
return {
'mean': np.mean(bootstrap_samples),
'std': np.std(bootstrap_samples),
'confidence_interval': {
'lower': np.percentile(bootstrap_samples, 2.5),
'upper': np.percentile(bootstrap_samples, 97.5)
},
'prediction_interval': {
'lower': np.percentile(bootstrap_samples, 5),
'upper': np.percentile(bootstrap_samples, 95)
}
}
class IntegratedEarlyWarningSystem:
"""نظام إنذار مبكر متكامل للتحولات الطورية"""
def __init__(self, predictors, threshold_config=None):
self.predictors = predictors
self.thresholds = threshold_config or self.default_thresholds()
self.warning_history = []
self.alert_levels = {
'low': {'color': 'green', 'action': 'متابعة عادية'},
'medium': {'color': 'yellow', 'action': 'مراقبة مكثفة'},
'high': {'color': 'orange', 'action': 'استعداد للطوارئ'},
'critical': {'color': 'red', 'action': 'تفعيل خطط الطوارئ'}
}
def default_thresholds(self):
"""العتبات الافتراضية للإنذارات"""
return {
'critical_slowing': 0.7,
'variance_increase': 0.6,
'autocorrelation': 0.8,
'fisher_information': 0.75,
'complexity_change': 0.65,
'prediction_uncertainty': 0.8
}
def comprehensive_risk_assessment(self, system_state, historical_data):
"""تقييم خطر شامل متعدد الأبعاد"""
risk_indicators = {}
# 1. تحليل التباطؤ الحرج
risk_indicators['critical_slowing'] = self.analyze_critical_slowing(historical_data)
# 2. تحليل التغير في التباين
risk_indicators['variance_increase'] = self.analyze_variance_changes(historical_data)
# 3. تحليل الارتباط الذاتي
risk_indicators['autocorrelation'] = self.analyze_autocorrelation(historical_data)
# 4. تحليل معلومات فيشر
risk_indicators['fisher_information'] = self.analyze_fisher_information(historical_data)
# 5. تحليل التعقيد
risk_indicators['complexity_change'] = self.analyze_complexity_changes(historical_data)
# 6. تحليل عدم اليقين في التنبؤ
risk_indicators['prediction_uncertainty'] = self.analyze_prediction_uncertainty(system_state)
# حساب درجة الخطر الكلية
overall_risk = self.calculate_overall_risk(risk_indicators)
# تحديد مستوى الإنذار
alert_level = self.determine_alert_level(overall_risk, risk_indicators)
return {
'risk_score': overall_risk,
'alert_level': alert_level,
'indicators': risk_indicators,
'timestamp': datetime.now(),
'recommended_actions': self.generate_actions(alert_level, risk_indicators)
}
def analyze_critical_slowing_down(self, data):
"""تحليل إشارات التباطؤ الحرج"""
if len(data) < 20:
return 0.0
# حساب التباين المتزايد
recent_variance = np.var(data[-10:])
historical_variance = np.var(data[:-10])
if historical_variance > 0:
variance_ratio = recent_variance / historical_variance
else:
variance_ratio = 1.0
# حساب الارتباط الذاتي المتزايد
recent_acf = self.calculate_autocorrelation(data[-10:], lag=1)
historical_acf = self.calculate_autocorrelation(data[:-10], lag=1)
acf_change = recent_acf - historical_acf
# مؤشر مركب للتباطؤ الحرج
slowing_indicator = min(1.0, 0.5 * variance_ratio + 0.5 * max(0, acf_change))
return slowing_indicator
def calculate_autocorrelation(self, data, lag=1):
"""حساب الارتباط الذاتي"""
if len(data) <= lag:
return 0.0
shifted = data[lag:]
original = data[:-lag]
if np.std(original) == 0 or np.std(shifted) == 0:
return 0.0
return np.corrcoef(original, shifted)[0, 1]
def analyze_variance_changes(self, data):
"""تحليل التغيرات في التباين"""
if len(data) < 30:
return 0.0
# تقسيم البيانات إلى فترات
n_periods = 3
period_length = len(data) // n_periods
variances = []
for i in range(n_periods):
start = i * period_length
end = (i + 1) * period_length
period_data = data[start:end]
variances.append(np.var(period_data))
# اتجاه التباين
if len(variances) >= 2:
trend = np.polyfit(range(len(variances)), variances, 1)[0]
normalized_trend = min(1.0, max(0.0, trend / (np.mean(variances) + 1e-8)))
else:
normalized_trend = 0.0
return normalized_trend
def analyze_fisher_information(self, data):
"""تحليل معلومات فيشر"""
if len(data) < 20:
return 0.0
# حساب مشتق كثافة الاحتمال (مبسط)
histogram, bin_edges = np.histogram(data, bins=10, density=True)
pdf = histogram / np.sum(histogram)
pdf_gradient = np.gradient(pdf)
with np.errstate(divide='ignore', invalid='ignore'):
fisher_info = np.sum((pdf_gradient ** 2) / pdf)
fisher_info = fisher_info if not np.isnan(fisher_info) and not np.isinf(fisher_info) else 0
return min(1.0, fisher_info / 10.0) # تطبيع
def analyze_complexity_changes(self, data):
"""تحليل التغيرات في تعقيد النظام"""
if len(data) < 50:
return 0.0
# حساب إنتروبيا العينة لمقاييس مختلفة
scales = [1, 2, 3, 4, 5]
entropy_values = []
for scale in scales:
coarse_data = self.coarse_grain(data, scale)
entropy = self.sample_entropy(coarse_data)
entropy_values.append(entropy)
# تغير التعقيد عبر المقاييس
complexity_change = np.std(entropy_values)
return min(1.0, complexity_change * 5.0) # تطبيع
def analyze_prediction_uncertainty(self, current_state):
"""تحليل عدم اليقين في التنبؤات"""
try:
# الحصول على تنبؤات من النماذج المختلفة
predictions = self.predictors.dynamic_ensemble_prediction(current_state)
# حساب تباين التنبؤات بين النماذج
all_preds = [predictions['physical']] + predictions['ml_models']
prediction_variance = np.var([np.mean(pred) for pred in all_preds])
return min(1.0, prediction_variance * 10.0) # تطبيع
except:
return 0.5 # قيمة افتراضية في حالة الخطأ
def coarse_grain(self, data, scale):
"""تقسيم البيانات إلى مقاييس خشنة"""
n = len(data) // scale
coarse = np.zeros(n)
for i in range(n):
coarse[i] = np.mean(data[i*scale:(i+1)*scale])
return coarse
def sample_entropy(self, data, m=2, r=0.2):
"""حساب إنتروبيا العينة"""
n = len(data)
if n <= m + 1:
return 0
std_data = np.std(data)
if std_data == 0:
return 0
r_val = r * std_data
def _maxdist(x_i, x_j):
return max([abs(ua - va) for ua, va in zip(x_i, x_j)])
def _phi(m):
x = [[data[j] for j in range(i, i + m)] for i in range(n - m + 1)]
C = 0
for i in range(len(x)):
for j in range(len(x)):
if i != j and _maxdist(x[i], x[j]) <= r_val:
C += 1
return C / ((n - m) * (n - m + 1))
return -np.log(_phi(m + 1) / _phi(m)) if _phi(m) != 0 else 0
def calculate_overall_risk(self, risk_indicators):
"""حساب درجة الخطر الكلية"""
weights = {
'critical_slowing': 0.25,
'variance_increase': 0.15,
'autocorrelation': 0.15,
'fisher_information': 0.20,
'complexity_change': 0.15,
'prediction_uncertainty': 0.10
}
overall_risk = 0
for indicator, weight in weights.items():
overall_risk += risk_indicators[indicator] * weight
return min(1.0, overall_risk)
def determine_alert_level(self, overall_risk, risk_indicators):
"""تحديد مستوى الإنذار"""
if overall_risk >= 0.8:
return 'critical'
elif overall_risk >= 0.6:
return 'high'
elif overall_risk >= 0.4:
return 'medium'
else:
return 'low'
def generate_actions(self, alert_level, risk_indicators):
"""توليد إجراءات مستحسنة بناءً على مستوى الإنذار"""
base_actions = self.alert_levels[alert_level]['action']
specific_actions = []
# إجراءات محددة بناءً على مؤشرات الخطر
if risk_indicators['critical_slowing'] > 0.7:
specific_actions.append("تعزيز آليات الاستقرار الذاتي")
if risk_indicators['prediction_uncertainty'] > 0.7:
specific_actions.append("تفعيل نماذج تنبؤ بديلة")
if risk_indicators['fisher_information'] > 0.7:
specific_actions.append("مراجعة استراتيجيات التدخل")
return [base_actions] + specific_actions
def monitor_system_continuously(self, data_stream, check_interval=60):
"""مراقبة مستمرة للإنذارات"""
while True:
try:
# الحصول على أحدث البيانات
current_data = data_stream.get_latest_data()
historical_data = data_stream.get_historical_data()
# تقييم المخاطر
risk_assessment = self.comprehensive_risk_assessment(
current_data, historical_data
)
# تسجيل الإنذار إذا لزم الأمر
if risk_assessment['alert_level'] in ['high', 'critical']:
self.trigger_alert(risk_assessment)
# تحديث سجل الإنذارات
self.warning_history.append(risk_assessment)
# الاحتفاظ فقط بالسجل الأخير
if len(self.warning_history) > 1000:
self.warning_history = self.warning_history[-1000:]
# الانتظار حتى الفحص التالي
time.sleep(check_interval)
except Exception as e:
print(f"خطأ في المراقبة المستمرة: {e}")
time.sleep(check_interval)
def trigger_alert(self, risk_assessment):
"""تفعيل إنذار عاجل"""
alert_message = f"""
🚨 إنذار نظام مراقبة التحولات الطورية 🚨
مستوى الخطر: {risk_assessment['alert_level'].upper()}
درجة الخطر: {risk_assessment['risk_score']:.2f}
الوقت: {risk_assessment['timestamp']}
المؤشرات الحرجة:
{self.format_risk_indicators(risk_assessment['indicators'])}
الإجراءات المستحسنة:
{' | '.join(risk_assessment['recommended_actions'])}
"""
print(alert_message)
# في التطبيق الحقيقي، إضافة:
# - إشعارات بريد إلكتروني
# - رسائل نصية
# - تكامل مع أنظمة المراقبة
# - تسجيل في قاعدة البيانات
def format_risk_indicators(self, indicators):
"""تنسيق مؤشرات الخطر للعرض"""
formatted = []
for indicator, value in indicators.items():
level = "🟢" if value < 0.4 else "🟡" if value < 0.7 else "🔴"
formatted.append(f"{level} {indicator}: {value:.2f}")
return "\n".join(formatted)
def launch_integrated_system():
"""تشغيل النظام المتكامل النهائي"""
print("🚀 بدء تشغيل النظام المتكامل للمراقبة والتنبؤ والإنذار...")
# 1. تهيئة النماذج
social_model = CompetitiveCooperativeIntentModel(num_intent_types=4, network_size=100)
# 2. إنشاء نظام التنبؤ الهجين
hybrid_predictor = AdvancedHybridPredictor(
physical_model=social_model,
ml_models=[MockMLModel() for _ in range(3)] # نماذج محاكاة
)
# 3. إنشاء نظام الإنذار المبكر
early_warning_system = IntegratedEarlyWarningSystem(
predictors=hybrid_predictor
)
# 4. تشغيل لوحة المراقبة
dashboard = RealTimeMonitoringDashboard(social_model)
print("✅ تم تهيئة جميع المكونات بنجاح")
print("📊 لوحة المراقبة جاهزة للتشغيل")
print("🤖 أنظمة التنبؤ والإنذار نشطة")
return {
'dashboard': dashboard,
'predictor': hybrid_predictor,
'warning_system': early_warning_system
}
class MockMLModel: """نموذج تعلم آلي محاكى للعرض""" def predict(self, X): return np.random.normal(0.5, 0.1, X.shape[1])
if name == "main": system = launch_integrated_system()
# تشغيل لوحة المراقبة (في خيط منفصل في التطبيق الحقيقي)
print("\n🌐 بدء تشغيل لوحة المراقبة...")
system['dashboard'].run_dashboard(debug=True, port=8050)
import asyncio
import aiohttp import psutil from datetime import datetime, timedelta import json import sqlite3 from typing import Dict, List, Optional import warnings warnings.filterwarnings('ignore')
class AdaptiveContinuousMonitor: """نظام مراقبة مستمرة ذاتي التكيف مع التغيرات الديناميكية"""
def __init__(self, config: Dict):
self.config = config
self.is_running = False
self.adaptation_history = []
self.performance_metrics = {}
self.anomaly_detector = AdaptiveAnomalyDetector()
self.resource_manager = ResourceManager()
# قواعد التكيف الذاتي
self.adaptation_rules = self._initialize_adaptation_rules()
def _initialize_adaptation_rules(self):
"""تهيئة قواعد التكيف الذاتي"""
return {
'high_cpu_usage': {
'condition': lambda metrics: metrics.get('cpu_percent', 0) > 80,
'action': self._reduce_monitoring_frequency,
'priority': 'high'
},
'memory_pressure': {
'condition': lambda metrics: metrics.get('memory_percent', 0) > 75,
'action': self._optimize_data_retention,
'priority': 'high'
},
'network_latency': {
'condition': lambda metrics: metrics.get('network_latency', 0) > 1000,
'action': self._enable_compression,
'priority': 'medium'
},
'data_volume_increase': {
'condition': lambda metrics: metrics.get('data_volume_growth', 0) > 50,
'action': self._scale_storage,
'priority': 'medium'
},
'anomaly_spike': {
'condition': lambda metrics: metrics.get('anomaly_count', 0) > 10,
'action': self._increase_monitoring_intensity,
'priority': 'critical'
}
}
async def start_continuous_monitoring(self):
"""بدء المراقبة المستمرة"""
self.is_running = True
print("🚀 بدء نظام المراقبة المستمرة ذاتية التكيف...")
# تشغيل مهام المراقبة المتوازية
tasks = [
asyncio.create_task(self._system_health_monitor()),
asyncio.create_task(self._data_stream_monitor()),
asyncio.create_task(self._performance_optimizer()),
asyncio.create_task(self._adaptive_rule_engine()),
asyncio.create_task(self._real_time_analyzer())
]
try:
await asyncio.gather(*tasks)
except Exception as e:
print(f"❌ خطأ في نظام المراقبة: {e}")
finally:
self.is_running = False
async def _system_health_monitor(self):
"""مراقبة صحة النظام والموارد"""
while self.is_running:
try:
# جمع مقاييس صحة النظام
system_metrics = {
'timestamp': datetime.now(),
'cpu_percent': psutil.cpu_percent(interval=1),
'memory_percent': psutil.virtual_memory().percent,
'disk_usage': psutil.disk_usage('/').percent,
'network_io': psutil.net_io_counters(),
'active_processes': len(psutil.pids()),
'system_load': psutil.getloadavg()[0] if hasattr(psutil, 'getloadavg') else 0
}
# تحديث مقاييس الأداء
self.performance_metrics.update(system_metrics)
# تسجيل البيانات
await self._store_metrics(system_metrics)
# التحقق من شروط التكيف
await self._check_adaptation_conditions(system_metrics)
await asyncio.sleep(5) # كل 5 ثواني
except Exception as e:
print(f"⚠️ خطأ في مراقبة صحة النظام: {e}")
await asyncio.sleep(10)
async def _data_stream_monitor(self):
"""مراقبة تدفقات البيانات في الوقت الحقيقي"""
batch_size = self.config.get('batch_size', 100)
processing_interval = self.config.get('processing_interval', 2)
while self.is_running:
try:
# محاكاة استقبال بيانات من مصادر متعددة
data_batch = await self._fetch_data_batch(batch_size)
if data_batch:
# معالجة فورية للدفعة
processed_data = await self._process_data_batch(data_batch)
# كشف الشذوذ في الوقت الحقيقي
anomalies = await self.anomaly_detector.detect_batch(processed_data)
if anomalies:
await self._handle_anomalies(anomalies)
# تحديث لوحة المراقبة
await self._update_dashboard(processed_data)
await asyncio.sleep(processing_interval)
except Exception as e:
print(f"⚠️ خطأ في مراقبة تدفق البيانات: {e}")
await asyncio.sleep(processing_interval * 2)
async def _performance_optimizer(self):
"""محسن أداء ذاتي التعلم"""
optimization_interval = self.config.get('optimization_interval', 60)
while self.is_running:
try:
# تحليل أنماط الاستخدام
usage_patterns = await self._analyze_usage_patterns()
# تحسين إعدادات المراقبة
await self._optimize_monitoring_settings(usage_patterns)
# ضبط معاملات الخوارزميات
await self._tune_algorithm_parameters()
# تنظيف البيانات القديمة
await self._cleanup_old_data()
await asyncio.sleep(optimization_interval)
except Exception as e:
print(f"⚠️ خطأ في محسن الأداء: {e}")
await asyncio.sleep(optimization_interval)
async def _adaptive_rule_engine(self):
"""محرك قواعد تكيفي ديناميكي"""
rule_evaluation_interval = self.config.get('rule_evaluation_interval', 30)
while self.is_running:
try:
# تحديث القواعد بناءً على السياق
await self._update_adaptation_rules()
# تقييم فعالية القواعد السابقة
await self._evaluate_rule_effectiveness()
# توليد قواعد جديدة
await self._generate_new_rules()
await asyncio.sleep(rule_evaluation_interval)
except Exception as e:
print(f"⚠️ خطأ في محرك القواعد: {e}")
await asyncio.sleep(rule_evaluation_interval)
async def _real_time_analyzer(self):
"""محلل في الوقت الحقيقي للاتجاهات والأنماط"""
analysis_interval = self.config.get('analysis_interval', 10)
while self.is_running:
try:
# تحليل الاتجاهات قصيرة المدى
short_term_trends = await self._analyze_short_term_trends()
# كشف التحولات الطورية
phase_transitions = await self._detect_phase_transitions()
# تحليل التبعيات والارتباطات
correlations = await self._analyze_correlations()
# تحديث نماذج التنبؤ
await self._update_prediction_models({
'trends': short_term_trends,
'transitions': phase_transitions,
'correlations': correlations
})
await asyncio.sleep(analysis_interval)
except Exception as e:
print(f"⚠️ خطأ في المحلل الزمني: {e}")
await asyncio.sleep(analysis_interval)
async def _check_adaptation_conditions(self, metrics: Dict):
"""التحقق من شروط التكيف وتنفيذ الإجراءات"""
triggered_actions = []
for rule_name, rule in self.adaptation_rules.items():
try:
if rule['condition'](metrics):
# تنفيذ إجراء التكيف
result = await rule['action'](metrics)
triggered_actions.append({
'rule': rule_name,
'timestamp': datetime.now(),
'metrics': metrics,
'result': result,
'priority': rule['priority']
})
# تسجيل التكيف
self.adaptation_history.append(triggered_actions[-1])
print(f"🔄 تنفيذ تكيف: {rule_name} - الأولوية: {rule['priority']}")
except Exception as e:
print(f"❌ خطأ في تنفيذ قاعدة التكيف {rule_name}: {e}")
return triggered_actions
async def _reduce_monitoring_frequency(self, metrics: Dict):
"""تقليل تردد المراقبة عند ارتفاع استخدام المعالج"""
current_interval = self.config.get('processing_interval', 2)
new_interval = min(current_interval * 1.5, 10) # لا تزيد عن 10 ثواني
self.config['processing_interval'] = new_interval
return {
'action': 'reduce_monitoring_frequency',
'old_interval': current_interval,
'new_interval': new_interval,
'reason': 'high_cpu_usage'
}
async def _optimize_data_retention(self, metrics: Dict):
"""تحسين استبقاء البيانات عند ضغط الذاكرة"""
retention_days = self.config.get('data_retention_days', 30)
new_retention = max(retention_days - 5, 7) # لا تقل عن 7 أيام
self.config['data_retention_days'] = new_retention
# تنظيف فوري للبيانات القديمة
await self._cleanup_old_data()
return {
'action': 'optimize_data_retention',
'old_retention': retention_days,
'new_retention': new_retention,
'reason': 'memory_pressure'
}
async def _enable_compression(self, metrics: Dict):
"""تمكين ضغط البيانات عند ارتفاع زمن الانتقال"""
if not self.config.get('compression_enabled', False):
self.config['compression_enabled'] = True
self.config['compression_level'] = 2 # مستوى متوسط
return {
'action': 'enable_compression',
'compression_enabled': True,
'compression_level': 2,
'reason': 'network_latency'
}
async def _scale_storage(self, metrics: Dict):
"""توسيع التخزين عند زيادة حجم البيانات"""
current_batch = self.config.get('batch_size', 100)
new_batch = min(current_batch * 2, 1000) # لا تزيد عن 1000
self.config['batch_size'] = new_batch
return {
'action': 'scale_storage',
'old_batch_size': current_batch,
'new_batch_size': new_batch,
'reason': 'data_volume_increase'
}
async def _increase_monitoring_intensity(self, metrics: Dict):
"""زيادة كثافة المراقبة عند اكتشاف شذوذ"""
current_interval = self.config.get('processing_interval', 2)
new_interval = max(current_interval / 2, 0.5) # لا تقل عن 0.5 ثانية
self.config['processing_interval'] = new_interval
# زيادة حساسية كشف الشذوذ
await self.anomaly_detector.increase_sensitivity()
return {
'action': 'increase_monitoring_intensity',
'old_interval': current_interval,
'new_interval': new_interval,
'reason': 'anomaly_spike'
}
# طرق مساعدة
async def _fetch_data_batch(self, batch_size: int):
"""جلب دفعة بيانات (محاكاة)"""
# في التطبيق الحقيقي، يتم جلب البيانات من مصادر حقيقية
await asyncio.sleep(0.1)
return [{'value': np.random.normal(0, 1), 'timestamp': datetime.now()}
for _ in range(batch_size)]
async def _process_data_batch(self, data_batch: List):
"""معالجة دفعة البيانات"""
return [self._enhance_data_point(point) for point in data_batch]
def _enhance_data_point(self, data_point: Dict):
"""تحسين نقطة البيانات بإضافة معلومات سياقية"""
data_point['processed'] = True
data_point['quality_score'] = np.random.uniform(0.8, 1.0)
data_point['context'] = {
'system_load': self.performance_metrics.get('system_load', 0),
'hour_of_day': datetime.now().hour,
'day_of_week': datetime.now().weekday()
}
return data_point
async def _handle_anomalies(self, anomalies: List):
"""معالجة البيانات الشاذة المكتشفة"""
for anomaly in anomalies:
# تسجيل الإنذار
await self._trigger_alert(anomaly)
# تحديث نماذج الكشف
await self.anomaly_detector.update_model(anomaly)
async def _trigger_alert(self, anomaly: Dict):
"""تفعيل إنذار للبيانات الشاذة"""
alert_message = {
'type': 'anomaly_detected',
'timestamp': datetime.now(),
'severity': anomaly.get('severity', 'medium'),
'description': anomaly.get('description', 'Unknown anomaly'),
'data_point': anomaly.get('data_point', {}),
'confidence': anomaly.get('confidence', 0.5)
}
print(f"🚨 إنذار: {alert_message['description']} - الشدة: {alert_message['severity']}")
# في التطبيق الحقيقي، إرسال إلى نظام الإنذار
return alert_message
async def _update_dashboard(self, data: List):
"""تحديث لوحة المراقبة بالبيانات الجديدة"""
# في التطبيق الحقيقي، تحديث through WebSocket أو مشابه
pass
async def _store_metrics(self, metrics: Dict):
"""تخزين مقاييس الأداء"""
# محاكاة التخزين في قاعدة بيانات
pass
async def _analyze_usage_patterns(self):
"""تحليل أنماط استخدام النظام"""
return {'pattern': 'normal', 'confidence': 0.8}
async def _optimize_monitoring_settings(self, patterns: Dict):
"""تحسين إعدادات المراقبة بناءً على أنماط الاستخدام"""
pass
async def _tune_algorithm_parameters(self):
"""ضبط معاملات الخوارزميات"""
pass
async def _cleanup_old_data(self):
"""تنظيف البيانات القديمة"""
pass
async def _update_adaptation_rules(self):
"""تحديث قواعد التكيف"""
pass
async def _evaluate_rule_effectiveness(self):
"""تقييم فعالية القواعد"""
pass
async def _generate_new_rules(self):
"""توليد قواعد تكيف جديدة"""
pass
async def _analyze_short_term_trends(self):
"""تحليل الاتجاهات قصيرة المدى"""
return {'trend': 'stable', 'direction': 'neutral'}
async def _detect_phase_transitions(self):
"""كشف التحولات الطورية"""
return {'transition_detected': False, 'confidence': 0.0}
async def _analyze_correlations(self):
"""تحليل الارتباطات بين المتغيرات"""
return {'correlations': {}}
async def _update_prediction_models(self, analysis_results: Dict):
"""تحديث نماذج التنبؤ"""
pass
class AdaptiveAnomalyDetector: """كاشف شذوذ متكيف مع التغيرات في البيانات"""
def __init__(self):
self.sensitivity_level = 1.0
self.learning_rate = 0.01
self.model_weights = {}
self.detection_history = []
async def detect_batch(self, data_batch: List):
"""كشف الشذوذ في دفعة البيانات"""
anomalies = []
for data_point in data_batch:
anomaly_score = await self._calculate_anomaly_score(data_point)
if anomaly_score > self.sensitivity_level:
anomaly = {
'data_point': data_point,
'score': anomaly_score,
'severity': self._determine_severity(anomaly_score),
'description': self._generate_description(anomaly_score),
'confidence': min(anomaly_score / 2.0, 1.0),
'timestamp': datetime.now()
}
anomalies.append(anomaly)
return anomalies
async def _calculate_anomaly_score(self, data_point: Dict):
"""حساب درجة الشذوذ"""
# خوارزمية كشف شذوذ مبسطة
base_score = abs(data_point.get('value', 0))
# عوامل السياق
context_factor = 1.0
if data_point.get('context'):
context = data_point['context']
if context.get('system_load', 0) > 80:
context_factor *= 1.2
if context.get('hour_of_day', 0) in [2, 3, 4]: # ساعات الليل
context_factor *= 0.8
return base_score * context_factor * self.sensitivity_level
def _determine_severity(self, anomaly_score: float):
"""تحديد شدة الشذوذ"""
if anomaly_score > 2.0:
return 'high'
elif anomaly_score > 1.5:
return 'medium'
else:
return 'low'
def _generate_description(self, anomaly_score: float):
"""توليد وصف للشذوذ"""
if anomaly_score > 2.0:
return "شذوذ حرج - انحراف كبير عن السلوك الطبيعي"
elif anomaly_score > 1.5:
return "شذوذ متوسط - انحراف ملحوظ"
else:
return "شذوذ طفيف - انحراف بسيط"
async def increase_sensitivity(self):
"""زيادة حساسية الكشف"""
self.sensitivity_level = max(0.5, self.sensitivity_level * 0.8) # تقليل العتبة
print(f"🔍 زيادة حساسية كشف الشذوذ إلى: {self.sensitivity_level:.3f}")
async def update_model(self, anomaly: Dict):
"""تحديث النموذج بناءً على الشذوذ المكتشف"""
# تحديث تعلم الآلة (مبسط)
learning_factor = self.learning_rate * anomaly.get('confidence', 0.5)
# في التطبيق الحقيقي، تحديث أوزان النموذج
class ResourceManager: """مدير موارد ذكي لإدارة موارد النظام"""
def __init__(self):
self.resource_limits = {
'max_cpu_percent': 85,
'max_memory_percent': 80,
'max_disk_usage': 90,
'max_processes': 1000
}
async def check_resource_limits(self):
"""التحقق من حدود الموارد"""
metrics = {
'cpu_percent': psutil.cpu_percent(),
'memory_percent': psutil.virtual_memory().percent,
'disk_usage': psutil.disk_usage('/').percent,
'process_count': len(psutil.pids())
}
violations = []
for resource, value in metrics.items():
limit = self.resource_limits.get(f'max_{resource}')
if limit and value > limit:
violations.append({
'resource': resource,
'value': value,
'limit': limit,
'excess': value - limit
})
return violations
import dash
from dash import dcc, html, Input, Output, State, callback_context import plotly.graph_objects as go from plotly.subplots import make_subplots import dash_bootstrap_components as dbc from datetime import datetime, timedelta import pandas as pd
class RealTimeComprehensiveDashboard: """لوحة مراقبة شاملة بتحديث حي وواجهة تفاعلية متقدمة"""
def __init__(self, monitoring_system):
self.monitoring_system = monitoring_system
self.app = dash.Dash(__name__,
external_stylesheets=[
dbc.themes.DARKLY,
'https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.0.0/css/all.min.css'
],
suppress_callback_exceptions=True)
self.setup_layout()
self.setup_callbacks()
self.live_data = {
'system_metrics': [],
'anomalies': [],
'adaptations': [],
'predictions': []
}
def setup_layout(self):
"""إعداد تخطيط لوحة التحكم المتقدمة"""
self.app.layout = dbc.Container([
# رأس اللوحة مع التنقل
self._create_header(),
# مؤشرات الأداء الرئيسية في الوقت الحقيقي
self._create_realtime_kpis(),
# قسم الرسوم البيانية الرئيسية
self._create_main_charts_section(),
# قسم الإنذارات والتحذيرات
self._create_alerts_section(),
# قسم التحليلات المتقدمة
self._create_advanced_analytics_section(),
# لوحة التحكم التفاعلية
self._create_control_panel(),
# التحديث التلقائي
dcc.Interval(
id='interval-component',
interval=2*1000, # 2 ثانية
n_intervals=0
),
dcc.Store(id='live-data-store'),
dcc.Store(id='alert-history-store'),
dcc.Store(id='system-status-store')
], fluid=True, style={'backgroundColor': '#1a1a1a', 'minHeight': '100vh'})
def _create_header(self):
"""إنشاء رأس اللوحة مع التنقل"""
return dbc.Navbar(
[
dbc.Container([
dbc.Row([
dbc.Col([
html.H1([
html.I(className="fas fa-radar-dashboard me-2"),
"نظام المراقبة الذكي المتكامل"
], className="text-light mb-0")
], width="auto"),
dbc.Col([
dbc.Nav([
dbc.NavItem(dbc.NavLink("نظرة عامة", href="#overview", external_link=True)),
dbc.NavItem(dbc.NavLink("التحليلات", href="#analytics", external_link=True)),
dbc.NavItem(dbc.NavLink("الإنذارات", href="#alerts", external_link=True)),
dbc.NavItem(dbc.NavLink("الإعدادات", href="#settings", external_link=True)),
], navbar=True)
], width="auto")
], align="center"),
dbc.Row([
dbc.Col([
html.Div([
html.Span("الحالة: ", className="text-light"),
dbc.Badge("نشط", color="success", className="me-2"),
html.Span("آخر تحديث: ", className="text-light"),
html.Span(id="last-update-time", className="text-warning")
])
])
], className="mt-2")
])
],
color="dark",
dark=True,
sticky="top"
)
def _create_realtime_kpis(self):
"""إنشاء مؤشرات الأداء الرئيسية في الوقت الحقيقي"""
return dbc.Row([
dbc.Col([
dbc.Card([
dbc.CardBody([
html.Div([
html.I(className="fas fa-heart-pulse fa-2x text-success me-3"),
html.Div([
html.H4("صحة النظام", className="card-title text-light"),
html.H2("95%", id="system-health-kpi", className="text-success")
])
], className="d-flex align-items-center")
])
], color="dark", className="h-100")
], width=3),
dbc.Col([
dbc.Card([
dbc.CardBody([
html.Div([
html.I(className="fas fa-bell fa-2x text-warning me-3"),
html.Div([
html.H4("الإنذارات النشطة", className="card-title text-light"),
html.H2("3", id="active-alerts-kpi", className="text-warning")
])
], className="d-flex align-items-center")
])
], color="dark", className="h-100")
], width=3),
dbc.Col([
dbc.Card([
dbc.CardBody([
html.Div([
html.I(className="fas fa-chart-line fa-2x text-info me-3"),
html.Div([
html.H4("كفاءة المراقبة", className="card-title text-light"),
html.H2("87%", id="efficiency-kpi", className="text-info")
])
], className="d-flex align-items-center")
])
], color="dark", className="h-100")
], width=3),
dbc.Col([
dbc.Card([
dbc.CardBody([
html.Div([
html.I(className="fas fa-robot fa-2x text-primary me-3"),
html.Div([
html.H4("التكيفات الذكية", className="card-title text-light"),
html.H2("12", id="adaptations-kpi", className="text-primary")
])
], className="d-flex align-items-center")
])
], color="dark", className="h-100")
], width=3)
], className="mb-4")
def _create_main_charts_section(self):
"""إنشاء قسم الرسوم البيانية الرئيسية"""
return html.Div([
html.H3("المراقبة في الوقت الحقيقي", className="text-light mb-3"),
dbc.Row([
# رسم تدفق البيانات
dbc.Col([
dbc.Card([
dbc.CardHeader("📊 تدفق البيانات الحي", className="text-light"),
dbc.CardBody([
dcc.Graph(id='live-data-stream')
])
], color="dark")
], width=8),
# حالة النظام
dbc.Col([
dbc.Card([
dbc.CardHeader("⚙️ حالة النظام", className="text-light"),
dbc.CardBody([
dcc.Graph(id='system-status-gauge')
])
], color="dark")
], width=4)
], className="mb-4"),
dbc.Row([
# تحليل الشذوذ
dbc.Col([
dbc.Card([
dbc.CardHeader("🔍 تحليل الشذوذ", className="text-light"),
dbc.CardBody([
dcc.Graph(id='anomaly-analysis-chart')
])
], color="dark")
], width=6),
# التكيف الذاتي
dbc.Col([
dbc.Card([
dbc.CardHeader("🔄 التكيف الذاتي", className="text-light"),
dbc.CardBody([
dcc.Graph(id='adaptation-timeline')
])
], color="dark")
], width=6)
])
])
def _create_alerts_section(self):
"""إنشاء قسم الإنذارات والتحذيرات"""
return html.Div([
html.H3("الإنذارات الذكية", className="text-light mb-3"),
dbc.Row([
dbc.Col([
dbc.Card([
dbc.CardHeader("🚨 الإنذارات الفورية", className="text-light"),
dbc.CardBody([
html.Div(id="live-alerts-container", children=[
self._create_alert_item("شذوذ حرج", "high", "5 دقائق", "انحراف في تدفق البيانات"),
self._create_alert_item("ضغط ذاكرة", "medium", "10 دقائق", "استخدام الذاكرة تجاوز 80%"),
self._create_alert_item("تباطؤ شبكة", "low", "15 دقائق", "زيادة في زمن الانتقال")
])
])
], color="dark")
], width=12)
])
])
def _create_alert_item(self, title, severity, time_ago, description):
"""إنشاء عنصر إنذار فردي"""
severity_colors = {
'high': 'danger',
'medium': 'warning',
'low': 'info'
}
severity_icons = {
'high': 'exclamation-triangle',
'medium': 'exclamation-circle',
'low': 'info-circle'
}
return dbc.Alert([
html.Div([
html.I(className=f"fas fa-{severity_icons[severity]} me-2"),
html.Strong(title, className="me-2"),
html.Small(f"منذ {time_ago}", className="text-muted"),
dbc.Badge(severity.upper(), color=severity_colors[severity], className="ms-2")
]),
html.P(description, className="mb-0 mt-2")
], color=severity_colors[severity], className="mb-2")
def _create_advanced_analytics_section(self):
"""إنشاء قسم التحليلات المتقدمة"""
return html.Div([
html.H3("التحليلات المتقدمة", className="text-light mb-3"),
dbc.Row([
# تحليل الاتجاهات
dbc.Col([
dbc.Card([
dbc.CardHeader("📈 تحليل الاتجاهات", className="text-light"),
dbc.CardBody([
dcc.Graph(id='trend-analysis-chart')
])
], color="dark")
], width=6),
# التنبؤ بالتحولات
dbc.Col([
dbc.Card([
dbc.CardHeader("🔮 التنبؤ بالتحولات", className="text-light"),
dbc.CardBody([
dcc.Graph(id='phase-prediction-chart')
])
], color="dark")
], width=6)
])
])
def _create_control_panel(self):
"""إنشاء لوحة التحكم التفاعلية"""
return dbc.Row([
dbc.Col([
dbc.Card([
dbc.CardHeader("🎛️ لوحة التحكم الذكية", className="text-light"),
dbc.CardBody([
dbc.Row([
dbc.Col([
html.Label("معدل التحديث:", className="text-light"),
dcc.Dropdown(
id='update-rate-selector',
options=[
{'label': '⏱️ سريع (1 ثانية)', 'value': 1000},
{'label': '🔄 عادي (2 ثانية)', 'value': 2000},
{'label': '🐌 بطيء (5 ثواني)', 'value': 5000}
],
value=2000,
className="mb-3"
)
], width=4),
dbc.Col([
html.Label("حساسية الإنذار:", className="text-light"),
dcc.Slider(
id='alert-sensitivity-slider',
min=0.1,
max=1.0,
step=0.1,
value=0.7,
marks={0.1: 'منخفضة', 0.5: 'متوسطة', 1.0: 'عالية'}
)
], width=4),
dbc.Col([
html.Label("استراتيجية التكيف:", className="text-light"),
dcc.Dropdown(
id='adaptation-strategy-selector',
options=[
{'label': '🛡️ وقائي', 'value': 'preventive'},
{'label': '⚡ تفاعلي', 'value': 'reactive'},
{'label': '🤖 توقعي', 'value': 'predictive'}
],
value='predictive',
className="mb-3"
)
], width=4)
]),
dbc.Row([
dbc.Col([
dbc.ButtonGroup([
dbc.Button("🔄 إعادة تعيين", id="reset-btn", color="secondary"),
dbc.Button("📊 تقرير فوري", id="report-btn", color="info"),
dbc.Button("🧹 تنظيف بيانات", id="cleanup-btn", color="warning"),
dbc.Button("🛑 طوارئ", id="emergency-btn", color="danger")
], className="w-100")
])
])
])
], color="dark")
], width=12)
], className="mt-4")
def setup_callbacks(self):
"""إعداد callback functions للتحديث التفاعلي"""
@self.app.callback(
[Output('live-data-stream', 'figure'),
Output('system-status-gauge', 'figure'),
Output('anomaly-analysis-chart', 'figure'),
Output('adaptation-timeline', 'figure'),
Output('trend-analysis-chart', 'figure'),
Output('phase-prediction-chart', 'figure'),
Output('system-health-kpi', 'children'),
Output('active-alerts-kpi', 'children'),
Output('efficiency-kpi', 'children'),
Output('adaptations-kpi', 'children'),
Output('last-update-time', 'children'),
Output('live-alerts-container', 'children')],
[Input('interval-component', 'n_intervals'),
Input('update-rate-selector', 'value'),
Input('alert-sensitivity-slider', 'value')]
)
def update_all_components(n_intervals, update_rate, sensitivity):
"""تحديث جميع مكونات اللوحة"""
# محاكاة بيانات حية
live_data = self._generate_live_data()
current_time = datetime.now().strftime("%H:%M:%S")
# تحديث الرسوم البيانية
data_stream_fig = self._create_data_stream_figure(live_data)
status_gauge_fig = self._create_status_gauge(live_data)
anomaly_fig = self._create_anomaly_chart(live_data)
adaptation_fig = self._create_adaptation_timeline(live_data)
trend_fig = self._create_trend_analysis(live_data)
prediction_fig = self._create_phase_prediction(live_data)
# تحديث مؤشرات الأداء
health_kpi = f"{live_data['system_health']}%"
alerts_kpi = str(live_data['active_alerts'])
efficiency_kpi = f"{live_data['efficiency']}%"
adaptations_kpi = str(live_data['adaptation_count'])
# تحديث الإنذارات
alerts_container = self._create_live_alerts(live_data['recent_alerts'])
return (data_stream_fig, status_gauge_fig, anomaly_fig, adaptation_fig,
trend_fig, prediction_fig, health_kpi, alerts_kpi, efficiency_kpi,
adaptations_kpi, current_time, alerts_container)
@self.app.callback(
Output('interval-component', 'interval'),
[Input('update-rate-selector', 'value')]
)
def update_refresh_rate(selected_rate):
"""تحديث معدل التحديث"""
return selected_rate
@self.app.callback(
[Output('reset-btn', 'children'),
Output('report-btn', 'children')],
[Input('reset-btn', 'n_clicks'),
Input('report-btn', 'n_clicks')]
)
def handle_control_buttons(reset_clicks, report_clicks):
"""معالجة أزرار التحكم"""
ctx = callback_context
if not ctx.triggered:
return "🔄 إعادة تعيين", "📊 تقرير فوري"
button_id = ctx.triggered[0]['prop_id'].split('.')[0]
if button_id == 'reset-btn' and reset_clicks:
return "✅ تم الإعادة", "📊 تقرير فوري"
elif button_id == 'report-btn' and report_clicks:
return "🔄 إعادة تعيين", "✅ تم التقرير"
return "🔄 إعادة تعيين", "📊 تقرير فوري"
def _generate_live_data(self):
"""توليد بيانات حية محاكاة"""
current_time = datetime.now()
# بيانات نظام محاكاة
system_health = max(80, min(99, 90 + np.random.normal(0, 5)))
active_alerts = max(0, int(3 + np.random.normal(0, 1)))
efficiency = max(75, min(95, 85 + np.random.normal(0, 5)))
return {
'timestamp': current_time,
'system_health': system_health,
'active_alerts': active_alerts,
'efficiency': efficiency,
'adaptation_count': 12 + np.random.randint(0, 3),
'data_points': [np.random.normal(0, 1) for _ in range(50)],
'anomaly_scores': [max(0, np.random.normal(0.5, 0.3)) for _ in range(20)],
'recent_alerts': [
{'title': 'شذوذ حرج', 'severity': 'high', 'time': '5 دقائق', 'desc': 'انحراف في تدفق البيانات'},
{'title': 'ضغط ذاكرة', 'severity': 'medium', 'time': '10 دقائق', 'desc': 'استخدام الذاكرة تجاوز 80%'},
{'title': 'تباطؤ شبكة', 'severity': 'low', 'time': '15 دقائق', 'desc': 'زيادة في زمن الانتقال'}
]
}
def _create_data_stream_figure(self, data):
"""إنشاء رسم تدفق البيانات الحي"""
fig = go.Figure()
# بيانات التدفق
time_points = [datetime.now() - timedelta(seconds=x) for x in range(50, 0, -1)]
fig.add_trace(go.Scatter(
x=time_points,
y=data['data_points'],
mode='lines',
name='تدفق البيانات',
line=dict(color='#00ff88', width=2),
fill='tozeroy',
fillcolor='rgba(0, 255, 136, 0.1)'
))
fig.update_layout(
title="تدفق البيانات في الوقت الحقيقي",
xaxis_title="الزمن",
yaxis_title="قيمة البيانات",
template="plotly_dark",
height=300,
showlegend=True
)
return fig
def _create_status_gauge(self, data):
"""إنشاء مقياس حالة النظام"""
fig = go.Figure(go.Indicator(
mode="gauge+number+delta",
value=data['system_health'],
domain={'x': [0, 1], 'y': [0, 1]},
title={'text': "صحة النظام", 'font': {'size': 24}},
delta={'reference': 90, 'increasing': {'color': "green"}},
gauge={
'axis': {'range': [0, 100], 'tickwidth': 1},
'bar': {'color': "darkblue"},
'steps': [
{'range': [0, 70], 'color': 'red'},
{'range': [70, 85], 'color': 'yellow'},
{'range': [85, 100], 'color': 'green'}],
'threshold': {
'line': {'color': "white", 'width': 4},
'thickness': 0.75,
'value': data['system_health']}
}
))
fig.update_layout(height=300, font={'color': "white"})
return fig
def _create_anomaly_chart(self, data):
"""إنشاء رسم تحليل الشذوذ"""
fig = go.Figure()
fig.add_trace(go.Histogram(
x=data['anomaly_scores'],
name='توزيع الشذوذ',
marker_color='#ff4444',
opacity=0.7
))
fig.add_vline(x=0.7, line_dash="dash", line_color="yellow",
annotation_text="عتبة الإنذار")
fig.update_layout(
title="تحليل توزيع درجات الشذوذ",
xaxis_title="درجة الشذوذ",
yaxis_title="التكرار",
template="plotly_dark",
height=300
)
return fig
def _create_adaptation_timeline(self, data):
"""إنشاء خط زمني للتكيفات"""
adaptations = [
{'time': '10:30', 'action': 'تقليل التردد', 'reason': 'ارتفاع CPU'},
{'time': '10:25', 'action': 'ضغط البيانات', 'reason': 'تباطؤ شبكة'},
{'time': '10:20', 'action': 'زيادة الحساسية', 'reason': 'شذوذ متكرر'},
{'time': '10:15', 'action': 'تحسين التخزين', 'reason': 'زيادة بيانات'}
]
fig = go.Figure()
for i, adapt in enumerate(adaptations):
fig.add_trace(go.Scatter(
x=[adapt['time']],
y=[i],
mode='markers+text',
name=adapt['action'],
text=[adapt['action']],
textposition="middle right",
marker=dict(size=15, color=['#ffaa00', '#00aaff', '#aaff00', '#ff00aa'][i])
))
fig.update_layout(
title="الخط الزمني للتكيفات الذاتية",
xaxis_title="الزمن",
yaxis=dict(showticklabels=False),
template="plotly_dark",
height=300,
showlegend=False
)
return fig
def _create_trend_analysis(self, data):
"""إنشاء رسم تحليل الاتجاهات"""
trends = np.cumsum(np.random.normal(0, 0.1, 30))
fig = go.Figure()
fig.add_trace(go.Scatter(
y=trends,
mode='lines',
name='الاتجاه',
line=dict(color='#00aaff', width=3)
))
# خط الاتجاه
z = np.polyfit(range(len(trends)), trends, 1)
p = np.poly1d(z)
fig.add_trace(go.Scatter(
y=p(range(len(trends))),
mode='lines',
name='اتجاه عام',
line=dict(color='#ffaa00', width=2, dash='dash')
))
fig.update_layout(
title="تحليل الاتجاهات قصيرة المدى",
template="plotly_dark",
height=300
)
return fig
def _create_phase_prediction(self, data):
"""إنشاء رسم التنبؤ بالتحولات الطورية"""
time_points = list(range(24))
current_state = np.sin(np.array(time_points) * 0.3) + 0.5
predicted_state = np.sin(np.array(time_points) * 0.3 + 1.0) + 0.5
fig = go.Figure()
fig.add_trace(go.Scatter(
x=time_points[:12],
y=current_state[:12],
mode='lines',
name='الحالة الحالية',
line=dict(color='#00ff88', width=3)
))
fig.add_trace(go.Scatter(
x=time_points[11:],
y=predicted_state[11:],
mode='lines',
name='التنبؤ',
line=dict(color='#ff4444', width=3, dash='dash')
))
fig.add_vline(x=11.5, line_dash="dot", line_color="yellow",
annotation_text="نقطة تحول متوقعة")
fig.update_layout(
title="التنبؤ بالتحولات الطورية",
template="plotly_dark",
height=300
)
return fig
def _create_live_alerts(self, alerts_data):
"""إنذارات حية تفاعلية"""
return [self._create_alert_item(alert['title'], alert['severity'],
alert['time'], alert['desc'])
for alert in alerts_data]
def run_dashboard(self, debug=True, port=8050):
"""تشغيل لوحة المراقبة"""
print(f"🚀 تشغيل لوحة المراقبة الشاملة على: http://localhost:{port}")
self.app.run_server(debug=debug, port=port)
async def main(): """الدالة الرئيسية لتشغيل النظام المتكامل"""
# تكوين نظام المراقبة
config = {
'batch_size': 100,
'processing_interval': 2,
'data_retention_days': 30,
'compression_enabled': False,
'optimization_interval': 60,
'rule_evaluation_interval': 30,
'analysis_interval': 10
}
# إنشاء نظام المراقبة
monitor = AdaptiveContinuousMonitor(config)
# إنشاء لوحة المراقبة
dashboard = RealTimeComprehensiveDashboard(monitor)
print("✅ تم تهيئة النظام المتكامل بنجاح")
print("📊 لوحة المراقبة جاهزة للتشغيل")
print("🔍 نظام المراقبة المستمرة نشط")
# تشغيل النظام (في التطبيق الحقيقي، يتم التشغيل في خيوط منفصلة)
try:
# تشغيل لوحة المراقبة
dashboard.run_dashboard(debug=True, port=8050)
except Exception as e:
print(f"❌ خطأ في تشغيل النظام: {e}")
if name == "main": # تشغيل النظام asyncio.run(main()) class IntelligentEarlyWarningSystem: """نظام إنذار مبكر ذكي يتعلم من الأنماط ويتكيف مع التغيرات"""
def __init__(self):
self.warning_rules = self._initialize_warning_rules()
self.learning_model = WarningLearningModel()
self.alert_history = []
self.pattern_database = {}
self.adaptive_thresholds = {}
def _initialize_warning_rules(self):
"""تهيئة قواعد الإنذار الذكية"""
return {
'critical_slowing': {
'detector': self._detect_critical_slowing,
'threshold': 0.7,
'severity': 'high',
'learning_enabled': True
},
'variance_spike': {
'detector': self._detect_variance_spike,
'threshold': 2.0,
'severity': 'medium',
'learning_enabled': True
},
'pattern_anomaly': {
'detector': self._detect_pattern_anomaly,
'threshold': 0.8,
'severity': 'high',
'learning_enabled': True
},
'trend_reversal': {
'detector': self._detect_trend_reversal,
'threshold': 0.6,
'severity': 'medium',
'learning_enabled': True
},
'correlation_breakdown': {
'detector': self._detect_correlation_breakdown,
'threshold': 0.5,
'severity': 'low',
'learning_enabled': True
}
}
async def analyze_continuous_stream(self, data_stream):
"""تحليل مستمر لتدفق البيانات لاكتشاف الإنذارات المبكرة"""
warnings_detected = []
for data_batch in data_stream:
batch_warnings = await self._analyze_batch(data_batch)
warnings_detected.extend(batch_warnings)
# تحديث النماذج التعلمية
await self._update_learning_models(batch_warnings, data_batch)
# تكيف العتبات
await self._adapt_thresholds(data_batch)
return warnings_detected
async def _analyze_batch(self, data_batch):
"""تحليل دفعة البيانات لاكتشاف الإنذارات"""
warnings = []
for rule_name, rule_config in self.warning_rules.items():
try:
detection_result = await rule_config['detector'](data_batch)
if detection_result['score'] > rule_config['threshold']:
warning = self._create_warning(
rule_name=rule_name,
score=detection_result['score'],
severity=rule_config['severity'],
data_evidence=detection_result.get('evidence', {}),
context=detection_result.get('context', {})
)
warnings.append(warning)
except Exception as e:
print(f"⚠️ خطأ في قاعدة الإنذار {rule_name}: {e}")
return warnings
async def _detect_critical_slowing(self, data_batch):
"""كشف التباطؤ الحرج (إشارة تحول طوري)"""
if len(data_batch) < 20:
return {'score': 0, 'evidence': {}}
# حساب التباين والارتباط الذاتي
values = [point.get('value', 0) for point in data_batch]
variance = np.var(values)
autocorr = self._calculate_autocorrelation(values, lag=1)
# مؤشر التباطؤ الحرج
slowing_score = min(1.0, variance * autocorr * 2)
return {
'score': slowing_score,
'evidence': {
'variance': variance,
'autocorrelation': autocorr,
'data_points_analyzed': len(values)
},
'context': {'detection_method': 'critical_slowing'}
}
async def _detect_variance_spike(self, data_batch):
"""كشف spikes في التباين"""
if len(data_batch) < 10:
return {'score': 0, 'evidence': {}}
values = [point.get('value', 0) for point in data_batch]
# تقسيم إلى نوافذ
window_size = min(5, len(values) // 2)
variances = []
for i in range(0, len(values) - window_size + 1, window_size):
window = values[i:i + window_size]
variances.append(np.var(window))
if len(variances) < 2:
return {'score': 0, 'evidence': {}}
# حساب نسبة التغير
variance_ratio = max(variances) / (min(variances) + 1e-8)
spike_score = min(1.0, variance_ratio / 5.0)
return {
'score': spike_score,
'evidence': {
'variance_ratio': variance_ratio,
'max_variance': max(variances),
'min_variance': min(variances)
}
}
async def _detect_pattern_anomaly(self, data_batch):
"""كشف شذوذ في الأنماط"""
# تحليل الأنماط باستخدام تحويل فورييه السريع
values = [point.get('value', 0) for point in data_batch]
if len(values) < 8:
return {'score': 0, 'evidence': {}}
# تحليل الترددات
fft_values = np.fft.fft(values)
frequencies = np.fft.fftfreq(len(values))
# البحث عن ترددات غير عادية
power_spectrum = np.abs(fft_values) ** 2
unusual_freq_power = np.sum(power_spectrum[frequencies > 0.3])
total_power = np.sum(power_spectrum)
anomaly_score = unusual_freq_power / (total_power + 1e-8)
return {
'score': anomaly_score,
'evidence': {
'unusual_frequency_power': unusual_freq_power,
'total_power': total_power,
'dominant_frequency': frequencies[np.argmax(power_spectrum)]
}
}
async def _detect_trend_reversal(self, data_batch):
"""كشف انعكاسات الاتجاه"""
if len(data_batch) < 15:
return {'score': 0, 'evidence': {}}
values = [point.get('value', 0) for point in data_batch]
# تحليل الاتجاه باستخدام انحدار خطي
x = np.arange(len(values))
slope, _ = np.polyfit(x, values, 1)
# تحليل التغير في المشتق (معدل التغير)
derivatives = np.diff(values)
sign_changes = np.sum(np.diff(np.sign(derivatives)) != 0)
reversal_score = min(1.0, (abs(slope) * sign_changes) / len(values))
return {
'score': reversal_score,
'evidence': {
'slope': slope,
'sign_changes': sign_changes,
'derivative_std': np.std(derivatives)
}
}
async def _detect_correlation_breakdown(self, data_batch):
"""كشف انهيار في الارتباطات"""
if len(data_batch) < 10:
return {'score': 0, 'evidence': {}}
# افتراض أن البيانات تحتوي على متغيرات متعددة
# في التطبيق الحقيقي، يتم تحليل الارتباط بين المتغيرات المختلفة
values = [point.get('value', 0) for point in data_batch]
# محاكاة تحليل الارتباط
correlation_stability = np.random.uniform(0.3, 0.9)
breakdown_score = 1.0 - correlation_stability
return {
'score': breakdown_score,
'evidence': {
'correlation_stability': correlation_stability,
'breakdown_magnitude': breakdown_score
}
}
def _create_warning(self, rule_name, score, severity, data_evidence, context):
"""إنشاء إنذار منظم"""
warning_id = f"WARN_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{rule_name}"
return {
'id': warning_id,
'timestamp': datetime.now(),
'rule': rule_name,
'score': score,
'severity': severity,
'evidence': data_evidence,
'context': context,
'status': 'active',
'auto_actions_taken': self._determine_auto_actions(severity, score),
'recommended_actions': self._generate_recommendations(rule_name, severity, data_evidence)
}
def _determine_auto_actions(self, severity, score):
"""تحديد الإجراءات التلقائية بناءً على شدة الإنذار"""
actions = []
if severity == 'high' and score > 0.8:
actions.extend([
'زيادة كثافة المراقبة',
'تفعيل نماذج الطوارئ',
'إشعار المسؤولين'
])
elif severity == 'medium' and score > 0.6:
actions.extend([
'تسجيل مفصل للبيانات',
'مراقبة مكثفة'
])
elif severity == 'low':
actions.append('تسجيل الإنذار')
return actions
def _generate_recommendations(self, rule_name, severity, evidence):
"""توليد توصيات بناءً على نوع الإنذار وشدة"""
recommendations = []
base_recommendations = {
'critical_slowing': [
"مراجعة إستراتيجية المراقبة الحالية",
"تفعيل آليات التكيف الوقائية",
"تحضير خطط طوارئ للتحول الطوري"
],
'variance_spike': [
"التحقق من مصدر البيانات",
"مراجعة معاملات كشف الشذوذ",
"تحليل استقرار النظام"
],
'pattern_anomaly': [
"دراسة الأنماط الجديدة",
"تحديث نماذج التعلم الآلي",
"مراجعة قواعد الإنذار"
],
'trend_reversal': [
"تحليل أسباب انعكاس الاتجاه",
"تقييم تأثير التغيرات",
"ضبط توقعات الأداء"
],
'correlation_breakdown': [
"فحص العلاقات بين المتغيرات",
"مراجعة الاعتماديات",
"تحسين نماذج الارتباط"
]
}
recommendations.extend(base_recommendations.get(rule_name, []))
# إضافة توصيات خاصة بالشدة
if severity == 'high':
recommendations.append("⚠️ التدخل الفوري مطلوب")
elif severity == 'medium':
recommendations.append("🔍 المراقبة المستمرة ضرورية")
return recommendations
async def _update_learning_models(self, warnings, data_batch):
"""تحديث نماذج التعلم من الإنذارات والبيانات"""
for warning in warnings:
await self.learning_model.update_from_warning(warning, data_batch)
async def _adapt_thresholds(self, data_batch):
"""تكيف عتبات الإنذار تلقائياً"""
for rule_name, rule_config in self.warning_rules.items():
if rule_config.get('learning_enabled', False):
new_threshold = await self._calculate_adaptive_threshold(
rule_name, data_batch
)
self.warning_rules[rule_name]['threshold'] = new_threshold
async def _calculate_adaptive_threshold(self, rule_name, data_batch):
"""حساب عتبة تكيفية بناءً على أنماط البيانات"""
# منطق تكيفي مبسط
current_threshold = self.warning_rules[rule_name]['threshold']
# تحليل توزيع البيانات الأخيرة
values = [point.get('value', 0) for point in data_batch]
data_std = np.std(values)
data_mean = np.mean(values)
# ضبط العتبة بناءً على تباين البيانات
if data_std > 1.0:
new_threshold = current_threshold * 1.1 # زيادة العتبة للبيانات المتباينة
elif data_std < 0.1:
new_threshold = current_threshold * 0.9 # خفض العتبة للبيانات المستقرة
else:
new_threshold = current_threshold
return max(0.1, min(1.0, new_threshold))
def _calculate_autocorrelation(self, data, lag=1):
"""حساب الارتباط الذاتي"""
if len(data) <= lag:
return 0.0
shifted = data[lag:]
original = data[:-lag]
if np.std(original) == 0 or np.std(shifted) == 0:
return 0.0
return np.corrcoef(original, shifted)[0, 1]
class WarningLearningModel: """نموذج تعلم آلي للإنذارات يتكيف مع الأنماط الجديدة"""
def __init__(self):
self.pattern_memory = {}
self.false_positive_history = []
self.true_positive_history = []
self.learning_rate = 0.01
async def update_from_warning(self, warning, data_batch):
"""تحديث النموذج بناءً على الإنذار والبيانات"""
warning_pattern = self._extract_pattern(warning, data_batch)
pattern_key = self._generate_pattern_key(warning_pattern)
if pattern_key in self.pattern_memory:
# تحديث النمط الموجود
self.pattern_memory[pattern_key]['frequency'] += 1
self.pattern_memory[pattern_key]['last_seen'] = datetime.now()
else:
# تسجيل نمط جديد
self.pattern_memory[pattern_key] = {
'pattern': warning_pattern,
'frequency': 1,
'first_seen': datetime.now(),
'last_seen': datetime.now(),
'severity': warning['severity']
}
def _extract_pattern(self, warning, data_batch):
"""استخراج النمط من الإنذار والبيانات"""
return {
'rule': warning['rule'],
'severity': warning['severity'],
'evidence_keys': list(warning['evidence'].keys()),
'data_statistics': {
'mean': np.mean([point.get('value', 0) for point in data_batch]),
'std': np.std([point.get('value', 0) for point in data_batch]),
'length': len(data_batch)
},
'timestamp_features': {
'hour': warning['timestamp'].hour,
'day_of_week': warning['timestamp'].weekday()
}
}
def _generate_pattern_key(self, pattern):
"""توليد مفتاح فريد للنمط"""
return hash(json.dumps(pattern, sort_keys=True, default=str))