import {
  distinctUntilChanged,
  filter,
  first,
  map,
  pipe,
  repeat,
  ReplaySubject,
  retry,
  share,
  take,
  takeWhile,
  tap,
  throwError,
  timer,
  withLatestFrom,
  type MonoTypeOperatorFunction,
  type Observable,
  type ObservableInput,
  type ObservableInputTuple,
  type OperatorFunction,
} from 'rxjs';

/**
 * RxJS operator that filters out null and undefined values
 * @usage
 * ```ts
 * this.observable$.pipe(
 *   isNonNullable()
 * );
 * ```
 * @returns Observable<NonNullable<T>>
 */
export function isNonNullable<T>() {
  return (source$: Observable<null | undefined | T>) =>
    source$.pipe(filter((v): v is NonNullable<T> => v != undefined));
}

/**
 * Alternative implementation
 * RxJS operator that filters out null and undefined values
 * @usage
 * ```ts
 * this.observable$.pipe(filter(isDefined))
 * ```
 * @returns arg is T extends null | undefined ? never : T
 */
export function isDefined<T>(arg: T | null | undefined): arg is T extends null | undefined ? never : T {
  return arg !== null && arg !== undefined;
}

/**
 * RxJS operator that filters out null and undefined values
 * @example
 * ```ts
 * const result$ = source$.pipe(filterNil());
 * ```
 */
export function filterNil<T>(): OperatorFunction<T, NonNullable<T>> {
  return filter((value: T): value is NonNullable<T> => value != undefined);
}

/**
 * RxJS operator that distincts array items by reference
 * @usage
 * ```ts
 * this.observable$.pipe(distinctUntilArrayItemChanged());
 * ```
 * @template T
 * @return {*}  {MonoTypeOperatorFunction<T[]>}
 */
export function distinctUntilArrayItemChanged<T>(): MonoTypeOperatorFunction<T[]> {
  return distinctUntilChanged((prevCollection: T[], currentCollection: T[]) => {
    if (prevCollection === currentCollection) {
      return true;
    }

    if (prevCollection.length !== currentCollection.length) {
      return false;
    }

    const isOneOfItemReferenceChanged = currentCollection.some((item, i) => {
      return prevCollection[i] !== item;
    });

    // return false means there is a change and we want to call next()
    return !isOneOfItemReferenceChanged;
  });
}

/**
 * RxJS operator to obtain a single value which fulfils a given condition from a stream with error or complete
 * @example
 * ```ts
 * const result3$ = source$.pipe(firstMatching((v) => v > 3));
 * result3$.subscribe({
 *  next: (v) => console.log(`[result3]: ${v}`),
 *  error: (err) => console.log(`[result3]: ${err}`),
 *  complete: () => console.log('[result3]: completed'),
 * });
 *
 * [result3]: completed
 *
 * const result4$ = source$.pipe(firstMatching((v) => v > 3, true));
 *
 * result4$.subscribe({
 *  next: (v) => console.log(`[result4]: ${v}`),
 *  error: (err) => console.log(`[result4]: ${err}`),
 *  complete: () => console.log('[result4]: completed'),
 * });
 *
 * [result4]: EmptyError: no elements in sequence
 * ```
 */
export function firstMatching<T>(
  predicateFn: (v: T) => boolean,
  required: boolean = false,
): MonoTypeOperatorFunction<T> {
  return required ? first(predicateFn) : pipe(filter(predicateFn), take(1));
}

/**
 * RxJS operator to cache the last emitted value for a given time
 * @example
 * ```ts
 * const result$ = response$.pipe(cache(7000));
 * ```
 */
export function cache<T>(ttl: number = Infinity): MonoTypeOperatorFunction<T> {
  return share({
    connector: () => new ReplaySubject(1),
    resetOnComplete: () => timer(ttl),
  });
}

/**
 * RxJS operator to track lifecycle events for a stream
 * @example
 * ```ts
 * const source1$ = of(1, 2, 3).pipe(withLifecycle('source1'));
 * ```
 */
export function withLifecycle<T>(streamId: string): MonoTypeOperatorFunction<T> {
  return tap({
    subscribe: () => console.log(`[${streamId}]: subscribed`),
    unsubscribe: () => console.log(`[${streamId}]: unsubscribed`),
    finalize: () => console.log(`[${streamId}]: emitted final value`),
  });
}

interface PollWhileConfig<T> {
  predicateFn: (v: T) => boolean;
  delay: number;
  count?: number;
  lastOnly?: boolean;
}

/**
 * RxJS operator to poll a stream while a given condition is met
 * whether or not to emit intermediate results can be configured with the lastOnly argument
 * @example
 * ```ts
 * const analysisStates: AnalysisState<string>[] = [
 *  { status: 'pristine', data: null },
 *  { status: 'pending', data: 'xy' },
 *  { status: 'pending', data: 'xyz' },
 *  { status: 'completed', data: 'xyz|abc' },
 * ];
 * const response$ = of(analysisStates);
 * const result$ = response$.pipe(
 *  filter((v): v is AnalysisState<string> => v !== null),
 *  pollWhile({
 *    predicateFn: ({ status }) => status !== 'completed',
 *    delay: 2000,
 *    // count: 5,
 *    // lastOnly: true,
 *  })
 * );
 * ```
 */
export function pollWhile<T>({
  predicateFn,
  delay,
  count = Infinity,
  lastOnly = false,
}: PollWhileConfig<T>): MonoTypeOperatorFunction<T> {
  const limiter = lastOnly
    ? pipe(
        filter((v: T) => !predicateFn(v)),
        take(1),
      )
    : takeWhile(predicateFn, true);

  return pipe(repeat({ delay, count }), limiter);
}

interface ErrorWithStatus extends Error {
  status: number;
}

interface RetryForStatusConfig {
  retryableStatuses: number[];
  delay: number;
  count?: number;
}

/**
 * RxJS operator to retry a stream for a given number of times only for certain response statuses
 * @example
 * ```ts
 * const result$ = response$.pipe(
 *  retryForStatus({ retryableStatuses: [500, 501], delay: 2000, count: 1 })
 * );
 * ```
 */
export function retryForStatus<T>({
  retryableStatuses,
  delay,
  count = Infinity,
}: RetryForStatusConfig): MonoTypeOperatorFunction<T> {
  return retry({
    count,
    delay: (err: ErrorWithStatus, retryCount) =>
      retryableStatuses.includes(err.status) ? timer(retryCount * delay) : throwError(() => err),
  });
}

/**
 * RxJS operator to emit the latest value from a stream
 * The built-in withLatestFrom operator allows to add data from supplementary streams to the source stream.
 * However, sometimes the source stream is just a trigger to perform a certain action for which data from supplementary streams is needed.
 * As a result, the array of elements in the output stream contains a dummy first element.
 * A better solution is to neglect the value from the trigger.
 * @example
 * ```ts
 * const trigger$ = timer(2000).pipe(map(() => 'click'));
 * const latestData1$ = of(1, 2, 3);
 * const latestData2$ = of('a', 'b');
 * const result$ = trigger$.pipe(toLatestFrom(latestData1$, latestData2$));
 * result$.subscribe({
 *  next: (v) => console.log(`[result]: ${v}`),
 *  error: (err) => console.log(`[result]: ${err}`),
 *  complete: () => console.log('[result]: completed'),
 * });
 * // [result]: 3,b
 * // [result]: completed
 * ```
 */
export function toLatestFrom<T, D1>(d1$: ObservableInput<D1>): OperatorFunction<T, D1>;
export function toLatestFrom<T, D1, D extends unknown[]>(
  d1$: ObservableInput<D1>,
  ...data$: [...ObservableInputTuple<D>]
): OperatorFunction<T, [D1, ...D]>;

export function toLatestFrom<D1, D extends unknown[]>(
  d1$: ObservableInput<D1>,
  ...data$: [...ObservableInputTuple<D>]
) {
  return pipe(
    withLatestFrom(d1$, ...data$),
    map(([_, ...data]) => (data.length === 1 ? data[0] : data)),
  );
}
