Files
rmtPocketWatcher/flutter_app/lib/services/websocket_service.dart
2025-12-14 21:53:46 -05:00

128 lines
3.7 KiB
Dart

import 'dart:async';
import 'dart:convert';
import 'package:flutter/foundation.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:flutter_dotenv/flutter_dotenv.dart';
import '../models/price_data.dart';
class WebSocketService {
WebSocketChannel? _channel;
final _latestPriceController = StreamController<LatestPrice>.broadcast();
final _connectionStatusController = StreamController<String>.broadcast();
Timer? _reconnectTimer;
bool _isConnecting = false;
Stream<LatestPrice> get latestPriceStream => _latestPriceController.stream;
Stream<String> get connectionStatusStream => _connectionStatusController.stream;
void connect() {
if (_isConnecting) return;
_isConnecting = true;
try {
final wsUrl = dotenv.env['WS_URL'] ?? 'ws://localhost:3000/ws/index';
if (kDebugMode) {
print('WebSocket connecting to: $wsUrl');
print('Available env vars: ${dotenv.env.keys.toList()}');
}
_channel = WebSocketChannel.connect(Uri.parse(wsUrl));
// Send subscription message after connection
_channel!.sink.add(jsonEncode({
'type': 'subscribe',
'data': {'channel': 'price_updates'}
}));
_connectionStatusController.add('Connected');
_isConnecting = false;
_channel!.stream.listen(
(message) {
try {
final data = jsonDecode(message as String) as Map<String, dynamic>;
if (kDebugMode) {
print('WebSocket message received: ${data['type']}');
}
switch (data['type']) {
case 'price_update':
final latestPrice = LatestPrice.fromJson({'data': data['data']});
_latestPriceController.add(latestPrice);
break;
case 'history_data':
// Handle history data if needed
break;
case 'connection_status':
if (kDebugMode) {
print('Connection status: ${data['data']}');
}
break;
case 'error':
if (kDebugMode) {
print('WebSocket error message: ${data['data']}');
}
break;
default:
if (kDebugMode) {
print('Unknown message type: ${data['type']}');
}
}
} catch (e) {
if (kDebugMode) {
print('Error parsing WebSocket message: $e');
}
}
},
onError: (error) {
if (kDebugMode) {
print('WebSocket error: $error');
}
_connectionStatusController.add('Error');
_scheduleReconnect();
},
onDone: () {
if (kDebugMode) {
print('WebSocket connection closed');
}
_connectionStatusController.add('Disconnected');
_scheduleReconnect();
},
);
} catch (e) {
if (kDebugMode) {
print('Failed to connect: $e');
}
_connectionStatusController.add('Error');
_isConnecting = false;
_scheduleReconnect();
}
}
void _scheduleReconnect() {
_reconnectTimer?.cancel();
_reconnectTimer = Timer(const Duration(seconds: 5), () {
if (kDebugMode) {
print('Attempting to reconnect...');
}
connect();
});
}
void requestHistory(String range) {
if (_channel != null) {
_channel!.sink.add(jsonEncode({
'type': 'request_history',
'range': range,
}));
}
}
void dispose() {
_reconnectTimer?.cancel();
_channel?.sink.close();
_latestPriceController.close();
_connectionStatusController.close();
}
}