import { Injectable, NgZone } from '@angular/core';
import { Router } from '@angular/router';
import { environment } from '@env/environment';
import { HubConnectionState } from '@microsoft/signalr';
import { ApiService } from '@portal-core/auth/services/api.service';
import { AuthService } from '@portal-core/auth/services/auth.service';
import { Build } from '@portal-core/builds/models/build.model';
import { BuildsService } from '@portal-core/builds/services/builds.service';
import { CurrentService } from '@portal-core/current/services/current.service';
import { DataStreamSignalRConnection } from '@portal-core/data-stream/util/data-stream-signalr-connection';
import { LicenseHostMap } from '@portal-core/license-host-maps/models/license-host-map.model';
import { LicenseHostMapsService } from '@portal-core/license-host-maps/services/license-host-maps.service';
import { LicenseStorageService } from '@portal-core/license-storage/services/license-storage.service';
import { LicenseBulkImportStep } from '@portal-core/license-users/enums/license-bulk-import-step.enum';
import { LicenseUserSeatType } from '@portal-core/license-users/enums/license-user-seat-type.enum';
import { LicenseBulkAddUserProgress } from '@portal-core/license-users/models/license-bulk-add-user-progress.model';
import { LicenseUsersService } from '@portal-core/license-users/services/license-users.service';
import { LicenseActivationType } from '@portal-core/licenses/enums/license-activation-type.enum';
import { License } from '@portal-core/licenses/models/license.model';
import { LicensesService } from '@portal-core/licenses/services/licenses.service';
import { ConversationsService } from '@portal-core/messages/services/conversations.service';
import { Notification } from '@portal-core/notifications/models/notification.model';
import { NotificationsService } from '@portal-core/notifications/services/notifications.service';
import { Permissions } from '@portal-core/permissions/models/permissions.model';
import { PermissionsService } from '@portal-core/permissions/services/permissions.service';
import { ProcessState } from '@portal-core/processes/enums/process-state.enum';
import { ProcessStatus } from '@portal-core/processes/enums/process-status.enum';
import { ProjectReportScanStatusWithLatestNotificationId } from '@portal-core/projects/models/project-report-scan-status-with-latest-notification-id.model';
import { UserOnlineStatus } from '@portal-core/users/models/user-online-status.model';
import { Observable, Subject, distinctUntilChanged, filter, interval, map, merge, mergeMap, startWith, switchMap, takeUntil } from 'rxjs';

@Injectable({
  providedIn: 'root'
})
export class DataStreamService {
  constructor(
    private authService: AuthService,
    private currentService: CurrentService,
    private licenseUsersService: LicenseUsersService,
    private notificationsService: NotificationsService,
    private permissionsService: PermissionsService,
    private buildsService: BuildsService,
    private conversationsService: ConversationsService,
    private licenseService: LicensesService,
    private licenseHostMapsService: LicenseHostMapsService,
    private licenseStorageService: LicenseStorageService,
    private router: Router,
    private ngZone: NgZone,
    private apiService: ApiService
  ) { }

  private buildCompletedSource: Subject<Build> = new Subject<Build>();
  private dataStreamSignalRConnection: DataStreamSignalRConnection;
  private hostmapCompletedSource: Subject<LicenseHostMap> = new Subject<LicenseHostMap>();
  private hubDisconnectedSource: Subject<void> = new Subject<void>();
  private hubReconnectedSource: Subject<void> = new Subject<void>();
  private initialized: boolean = false;
  private licenseBulkAddUserProgressSource: Subject<LicenseBulkAddUserProgress> = new Subject<LicenseBulkAddUserProgress>();
  private metaDataUpdatedSource: Subject<string> = new Subject<string>();
  private newReportNotificationSource: Subject<ProjectReportScanStatusWithLatestNotificationId> = new Subject<ProjectReportScanStatusWithLatestNotificationId>();
  private restartTimeoutId: any;
  private siteUrlsUpdatedSource: Subject<void> = new Subject<void>();

  public buildCompleted$: Observable<Build> = this.buildCompletedSource.asObservable();
  public hostmapCompleted$: Observable<LicenseHostMap> = this.hostmapCompletedSource.asObservable();
  public hubDisconnected$: Observable<void> = this.hubDisconnectedSource.asObservable();
  public hubReconnected$: Observable<void> = this.hubReconnectedSource.asObservable();
  public licenseBulkAddUserProgress$: Observable<LicenseBulkAddUserProgress> = this.licenseBulkAddUserProgressSource.asObservable();
  public metaDataUpdated$: Observable<string> = this.metaDataUpdatedSource.asObservable();
  public newReportNotification$ = this.newReportNotificationSource.asObservable();
  public siteUrlsUpdated$: Observable<void> = this.siteUrlsUpdatedSource.asObservable();

  /**
   * Initializes the service. This should only be called once.
   */
  public init() {
    if (!this.initialized) {
      this.initialized = true;

      this.authService.getAccessToken$().pipe(
        distinctUntilChanged(),
      ).subscribe(accessToken => {
        if (accessToken) {
          this.start(accessToken);
        } else {
          this.stop();
        }
      });
    }
  }

  public onLicenseBulkAddUserProgressByLicenseId$(licenseId: number, filterOutErrors: boolean = false): Observable<LicenseBulkAddUserProgress> {
    return this.licenseBulkAddUserProgress$.pipe(
      filter(licenseBulkAddUserProgress => {
        // Filter out any progress events from other licenses
        if (licenseBulkAddUserProgress.LicenseId !== licenseId) {
          return false;
        }

        // Filter out any progress events with errors if specified to do so
        if (filterOutErrors && licenseBulkAddUserProgress.Errors?.length > 0) {
          return false;
        }

        return true;
      })
    );
  }

  public onLicenseHostMapDeletedByLicenseHostMapId$(licenseHostMapId: number): Observable<LicenseHostMap> {
    return this.hostmapCompleted$.pipe(
      filter(licenseHostMap => licenseHostMap.Id === licenseHostMapId && licenseHostMap.IsDeleted)
    );
  }

  public onLicenseBulkAddUserProgressByProcessId$(licenseId: number, processId: number, totalUsersRequested: number): Observable<LicenseBulkAddUserProgress> {
    return merge(
      // Listen to bulk add user progress messages from SignalR
      this.licenseBulkAddUserProgress$.pipe(
        // Filter out any progress events from other bulk add requests
        filter(licenseBulkAddUserProgress => licenseBulkAddUserProgress.Id === processId)
      ),
      merge(
        this.hubDisconnected$.pipe(
          // Start with the current connection state which is true if SignalR is currently connected
          startWith(this.dataStreamSignalRConnection?.hubConnection?.state === HubConnectionState.Connected),
          // Filter out the emission if SignalR is connected
          filter(connected => !connected),
          // Switch to a new interval observable that is canceled when SignalR reconnects. Use a new observable so that takeUntil only completes the interval observable instead of the hubDisconnected$ observable
          switchMap(() => {
            // Poll for the bulk import status
            return interval(10000).pipe(
              // Fetch the bulk import status. Use mergeMap instead of switchMap so that previous requests are not canceled (which could be an issue if the requests are taking a long time)
              mergeMap(() => this.licenseUsersService.getBulkImportProcessReport$(processId)),
              // Stop polling if SignalR reconnects
              takeUntil(this.hubReconnected$)
            );
          })
        ),
        // Listen to SignalR reconnecting to get the latest state of the bulk import process
        this.hubReconnected$.pipe(
          // Fetch the bulk import status
          switchMap(() => this.licenseUsersService.getBulkImportProcessReport$(processId)),
        )
      ).pipe(
        // Filter out unfinished statuses. If the process is still running then normal SignalR messages will be received so we don't need to do anything
        // We also cannot get progress information from the bulk import status. We can only get if its completed or not
        filter(processReport => processReport.Status === ProcessStatus.Finished || processReport.Status === ProcessStatus.Terminated),
        // Create a LicenseBulkAddUserProgress model from the process report
        map(processReport => {
          if (processReport.Status === ProcessStatus.Finished) {
            return {
              Errors: null,
              Id: processId,
              CurrentStep: LicenseBulkImportStep.ImportComplete,
              LicenseId: licenseId,
              TotalSteps: LicenseBulkImportStep.ImportComplete, // This is the last step and its value is also the total number of steps
              Batches: 1, // if the process is finished, report 1/1 batch to set progress bar to the end
              CurrentBatch: 1 // if the process is finished, report 1/1 batch to set progress bar to the end
            };
          } else if (processReport.Status === ProcessStatus.Terminated) {
            return {
              Errors: [{
                Code: null,
                ExceptionMessage: null,
                InnerExceptionMessage: null,
                Message: processReport.Errors
              }],
              Id: processId,
              CurrentStep: LicenseBulkImportStep.Unknown,
              LicenseId: licenseId,
              TotalSteps: LicenseBulkImportStep.ImportComplete, // This is the last step and its value is also the total number of steps
              Batches: 1, // if the process is terminated, report 1/1 batch to set progress bar to the end
              CurrentBatch: 1 // if the process is terminated, report 1/1 batch to set progress bar to the end
            };
          }
        })
      )
    );
  }

  /**
   * Listens to an event from the server and invokes a callback when the event occurs. The callback is run inside of the NG zone so that change detection works correctly.
   */
  private on(name: string, callback: (...args: any[]) => void) {
    this.dataStreamSignalRConnection.on(name, (...args) => {
      this.ngZone.run(() => {
        callback(...args);
      });
    });
  }

  /**
   * Logs an error if signalR logging is enabled.
   * @param error The error that occurred.
   * @param message The message to log with the error.
   */
  private onError(error: any, message: string = 'SignalR Error') {
    if (environment.signalrLoggingEnabled) {
      console.error(message, error);
    }
  }

  /**
   * Starts a new connection after a delay. After the delay the connection is only started if the user is still logged in.
   * @param delayMS The amount of milliseconds to wait before starting the new socket connection.
   */
  private restart(delayMS: number = 30000) {
    this.restartTimeoutId = setTimeout(() => {
      if (this.authService.isAuthenticated()) {
        this.start(this.authService.getAccessToken());
      }
    }, delayMS);
  }

  /**
   * Starts up the socket connection. Retries to connect indefinitely.
   * @param accessToken The user's auth token.
   */
  private start(accessToken: string) {
    // Stop the current connection so that we start from a clean slate
    this.stop();

    // Initialize the new connection
    this.dataStreamSignalRConnection = new DataStreamSignalRConnection(this.apiService.centralApiBaseUri, accessToken);
    this.subscribeToEvents();

    // Start the connection
    this.dataStreamSignalRConnection.start().catch(() => {
      // The connection failed so retry
      this.restart();
    });
  }

  /**
   * Stops the connection and stops listening to events from it.
   */
  private stop() {
    // Clear the restart timer since the connection is being stopped
    clearTimeout(this.restartTimeoutId);
    this.dataStreamSignalRConnection?.stop();
    this.dataStreamSignalRConnection = null;
  }

  /**
   * Begins listening to events from the server.
   */
  private subscribeToEvents() {
    this.on('userOnlineStatusChanged', (users: UserOnlineStatus[]) => {
      this.licenseUsersService.setUserOnlineStatus(users);
    });

    this.on('notificationReceived', (notification: Notification) => {
      this.notificationsService.addDataStreamNotifications([notification]);
    });

    this.on('permissionsChanged', (licenseId: number, projectId: number, permissions: Permissions[]) => {
      this.permissionsService.updateUserPermissions$(licenseId, projectId, permissions).subscribe({
        error: error => this.onError(error, 'SignalR permissionsChanged.updateUserPermissions$')
      });
    });

    this.on('teamPermissionsChanged', (licenseId: number, teamId: number, projectId?: number) => {
      if (typeof projectId === 'number') {
        this.permissionsService.invalidateProjectPermissions$(projectId).subscribe({
          error: error => this.onError(error, 'SignalR onTeamPermissionsChanged.invalidateProjectPermissions$')
        });
      } else {
        this.permissionsService.invalidateLicensePermissions$(licenseId).subscribe({
          error: error => this.onError(error, 'SignalR onTeamPermissionsChanged.invalidateLicensePermissions$')
        });
      }
    });

    this.on('buildUpdated', (build: Build) => {
      this.buildsService.updateBuild$(build);
      if (build.LastNotification.ProcessState === ProcessState.Complete) {
        this.buildCompletedSource.next(build);
      }
    });

    this.on('metaDataUpdated', (commitId: string) => {
      this.metaDataUpdatedSource.next(commitId);
    });

    this.on('hostmapCompleted', (licenseHostmap: LicenseHostMap) => {
      if (licenseHostmap.LicenseId === this.currentService.getActiveLicenseId()) {
        this.licenseHostMapsService.updateItems$([licenseHostmap]);
        this.hostmapCompletedSource.next(licenseHostmap);
      }
    });

    this.on('siteUrlsUpdated', (licenseId: number) => {
      if (licenseId === this.currentService.getActiveLicenseId()) {
        this.siteUrlsUpdatedSource.next();
      }
    });

    this.on('userHasNewNotifications', (userId: string, licenseId: number) => {
      this.licenseUsersService.setLicenseUserHasNewNotificationsByUserIdAndLicenseId(userId, licenseId, true);
    });

    this.on('userHasNewReportNotifications', (newStatus: ProjectReportScanStatusWithLatestNotificationId) => {
      this.newReportNotificationSource.next(newStatus);
    });

    this.on('userHasNewMessages', (licenseUserId: number) => {
      this.licenseUsersService.setLicenseUserHasNewMessages(licenseUserId, true);
    });

    this.on('purchaseCompleted', (license: License) => {
      const oldLicense = this.licenseService.getItemById(license.Id);
      if (oldLicense.LicenseActivationStatus !== LicenseActivationType.Active) {
        // Route away from expired license alert page
        this.router.navigate(['/', license.Id, 'home']);
      }

      this.licenseService.updateItems$([license]);

      // Refresh the license storage but only if it is already being used on the client
      if (this.licenseStorageService.getItemById(license.Id)) {
        this.licenseStorageService.refreshItemById(license.Id);
      }
    });

    this.on('bulkUserProgress', (licenseBulkAddUserProgress: LicenseBulkAddUserProgress) => {
      this.licenseBulkAddUserProgressSource.next(licenseBulkAddUserProgress);
    });

    // Listen to the disconnect event to restart the connection
    this.dataStreamSignalRConnection.hubDisconnected$.subscribe(() => {
      this.hubDisconnectedSource.next();
      this.restart();
    });

    // Listen to the reconnect event to get the latest state of the app
    this.dataStreamSignalRConnection.hubReconnected$.subscribe(() => {
      if (this.dataStreamSignalRConnection.hubConnection.state === HubConnectionState.Connected) {
        const licenseUser = this.currentService.getLicenseUser();
        // Only authors get messages
        if (licenseUser && licenseUser.SeatType === LicenseUserSeatType.Author) {
          this.conversationsService.getMyConversationUnreadCountByLicenseId$(licenseUser.LicenseId).subscribe(unreadCount => {
            this.licenseUsersService.setLicenseUserHasNewMessages(licenseUser.Id, Number.isInteger(unreadCount) && unreadCount > 0);
          });
        }

        this.permissionsService.resetUserPermissions$();
        this.dataStreamSignalRConnection.invoke('GetOnlineAsync');
        this.hubReconnectedSource.next();
      }
    });
  }
}
