import 'dart:async'; import 'dart:convert'; import 'package:flutter/foundation.dart'; import 'package:web_socket_channel/web_socket_channel.dart'; import 'package:flutter_dotenv/flutter_dotenv.dart'; import '../models/price_data.dart'; class WebSocketService { WebSocketChannel? _channel; final _latestPriceController = StreamController.broadcast(); final _connectionStatusController = StreamController.broadcast(); Timer? _reconnectTimer; bool _isConnecting = false; Stream get latestPriceStream => _latestPriceController.stream; Stream get connectionStatusStream => _connectionStatusController.stream; void connect() { if (_isConnecting) return; _isConnecting = true; try { final wsUrl = dotenv.env['WS_URL'] ?? 'ws://localhost:3000/ws/index'; if (kDebugMode) { print('WebSocket connecting to: $wsUrl'); print('Available env vars: ${dotenv.env.keys.toList()}'); } _channel = WebSocketChannel.connect(Uri.parse(wsUrl)); // Send subscription message after connection _channel!.sink.add(jsonEncode({ 'type': 'subscribe', 'data': {'channel': 'price_updates'} })); _connectionStatusController.add('Connected'); _isConnecting = false; _channel!.stream.listen( (message) { try { final data = jsonDecode(message as String) as Map; if (kDebugMode) { print('WebSocket message received: ${data['type']}'); } switch (data['type']) { case 'price_update': final latestPrice = LatestPrice.fromJson({'data': data['data']}); _latestPriceController.add(latestPrice); break; case 'history_data': // Handle history data if needed break; case 'connection_status': if (kDebugMode) { print('Connection status: ${data['data']}'); } break; case 'error': if (kDebugMode) { print('WebSocket error message: ${data['data']}'); } break; default: if (kDebugMode) { print('Unknown message type: ${data['type']}'); } } } catch (e) { if (kDebugMode) { print('Error parsing WebSocket message: $e'); } } }, onError: (error) { if (kDebugMode) { print('WebSocket error: $error'); } _connectionStatusController.add('Error'); _scheduleReconnect(); }, onDone: () { if (kDebugMode) { print('WebSocket connection closed'); } _connectionStatusController.add('Disconnected'); _scheduleReconnect(); }, ); } catch (e) { if (kDebugMode) { print('Failed to connect: $e'); } _connectionStatusController.add('Error'); _isConnecting = false; _scheduleReconnect(); } } void _scheduleReconnect() { _reconnectTimer?.cancel(); _reconnectTimer = Timer(const Duration(seconds: 5), () { if (kDebugMode) { print('Attempting to reconnect...'); } connect(); }); } void requestHistory(String range) { if (_channel != null) { _channel!.sink.add(jsonEncode({ 'type': 'request_history', 'range': range, })); } } void dispose() { _reconnectTimer?.cancel(); _channel?.sink.close(); _latestPriceController.close(); _connectionStatusController.close(); } }