/**
 * this plugin adds the RxCollection.syncGraphQl()-function to rxdb
 * you can use it to sync collections with remote graphql endpoint
 */

import { BehaviorSubject, Observable, Subject, Subscription } from 'rxjs';
import { filter, first } from 'rxjs/operators';
import GraphQLClient from 'graphql-client';

// @ts-ignore
import { flatClone, hash, now, promiseWait } from 'rxdb/dist/lib/util';

import { addRxPlugin, RxDatabase } from 'rxdb/plugins/core';

import {
  createRevisionForPulledDocument,
  DEFAULT_MODIFIER,
  getDocsWithRevisionsFromPouch,
  wasRevisionfromPullReplication,
} from './helper';
import {
  getChangesSinceLastPushSequence,
  getLastPullDocument,
  setLastPullDocument,
  setLastPushSequence,
} from './crawling-checkpoint';

import { RxDBWatchForChangesPlugin } from 'rxdb/plugins/watch-for-changes';
import { RxDBLeaderElectionPlugin } from 'rxdb/plugins/leader-election';
// @ts-ignore
import { changeEventfromPouchChange } from 'rxdb/dist/lib/rx-change-event';
// @ts-ignore
import { overwritable } from 'rxdb/dist/lib/overwritable';
import type {
  GraphQLSyncPullOptions,
  GraphQLSyncPushOptions,
  RxCollection,
  RxPlugin,
} from 'rxdb/dist/types';
import { debounce, groupBy, maxBy } from 'lodash';
import { logDebug } from '../../logger';
import { subMilliseconds } from 'date-fns';

export declare type ExcludesNull = <Z>(x: Z | null) => x is Z;
addRxPlugin(RxDBLeaderElectionPlugin);

/**
 * add the watch-for-changes-plugin
 * so pouchdb will emit events when something gets written to it
 */
addRxPlugin(RxDBWatchForChangesPlugin);

interface DataType extends Record<string, any> {
  itemType: string;
}

interface DataTypeWithoutItemType extends Record<string, any> {
  itemType?: undefined;
}

type SecDataType = { doc: DataType; seq: any };

class LimitedDateList {
  private list: Date[] = [];
  private readonly limit: number;

  constructor(limit: number) {
    this.limit = limit;
  }

  add(d: Date) {
    this.list.push(d);
    while (this.list.length > this.limit) {
      this.list.shift();
    }
  }

  /** @param lookBack: number of milliseconds to look back */
  getLatestCount(lookBack: number) {
    const earliest = subMilliseconds(new Date(), lookBack);
    return this.list.filter((d) => d >= earliest).length;
  }
}

export class RxGraphQLReplicationState {
  constructor(
    public readonly collections: RxCollection[],
    public readonly url: string,
    public headers: { [k: string]: string },
    public readonly pull: GraphQLSyncPullOptions,
    public readonly push: GraphQLSyncPushOptions,
    public readonly deletedFlag: string,
    public readonly lastPulledRevField: string,
    public readonly tsField: string,
    public readonly live: boolean,
    public liveInterval: number,
    public retryTime: number,
    public numRetries: number,
    public readonly syncRevisions: boolean
  ) {
    this.client = GraphQLClient({
      url,
      headers,
    });
    this.endpointHash = hash(url);
    this._prepare();
  }
  public client: any;
  public endpointHash: string;
  public _subjects = {
    recieved: new Subject(), // all documents that are recieved from the endpoint
    successfulPull: new Subject(), // emits true after pull succeeds, for multi page pull emits for each page
    send: new Subject(), // all documents that are send to the endpoint
    error: new Subject(), // all errors that are revieced from the endpoint, emits new Error() objects
    canceled: new BehaviorSubject(false), // true when the replication was canceled
    active: new BehaviorSubject(false), // true when something is running, false when not
    initialReplicationComplete: new BehaviorSubject(false), // true the initial replication-cycle is over
  };
  public _runningPromise: Promise<void> = Promise.resolve();
  public _subs: Subscription[] = [];

  public _runQueueCount = 0;
  public _runCount = 0; // used in tests

  public initialReplicationComplete$: Observable<any> = undefined as any;

  public recieved$: Observable<any> = undefined as any;
  public send$: Observable<any> = undefined as any;
  public error$: Observable<any> = undefined as any;
  public successfulPull$: Observable<any> = undefined as any;
  public canceled$: Observable<any> = undefined as any;
  public active$: Observable<boolean> = undefined as any;

  /**
   * things that are more complex to not belong into the constructor
   */
  _prepare() {
    // stop sync when collection gets destroyed

    for (const collection of this.collections) {
      collection.onDestroy.then(() => {
        // simply cancel if ANY is destroyed for now
        this.cancel();
      });
    }

    // create getters for the observables
    Object.keys(this._subjects).forEach((key) => {
      Object.defineProperty(this, key + '$', {
        get: function () {
          return this._subjects[key].asObservable();
        },
      });
    });
  }

  isStopped(): boolean {
    if (!this.live && this._subjects.initialReplicationComplete['_value'])
      return true;
    if (this._subjects.canceled['_value']) return true;
    else return false;
  }

  awaitInitialReplication(): Promise<true> {
    return this.initialReplicationComplete$
      .pipe(
        filter((v) => v === true),
        first()
      )
      .toPromise();
  }

  // ensures this._run() does not run in parallel
  async runNotDebounced(retryOnFail = true): Promise<void> {
    if (this.isStopped()) {
      return;
    }

    if (this._runQueueCount > 2) {
      return this._runningPromise;
    }

    this._runQueueCount++;
    this._runningPromise = this._runningPromise.then(async () => {
      this._subjects.active.next(true);
      const willRetry = await this._run(retryOnFail);
      this._subjects.active.next(false);
      if (
        retryOnFail &&
        !willRetry &&
        this._subjects.initialReplicationComplete['_value'] === false
      ) {
        this._subjects.initialReplicationComplete.next(true);
      }
      this._runQueueCount--;
    });
    return this._runningPromise;
  }

  run = debounce(this.runNotDebounced, 500);

  private runTimeouts: Set<number> = new Set([]);

  // store more than num retries to also store the initial failure
  private lastPushFailures: LimitedDateList = new LimitedDateList(
    this.numRetries + 1
  );
  private lastPullFailures: LimitedDateList = new LimitedDateList(
    this.numRetries + 1
  );

  private getLookBack() {
    return this.retryTime * (this.numRetries + 2);
  }

  /**
   * returns true if retry must be done
   * in the current set up, if retryOnFail=false, return value does not matter
   */
  async _run(retryOnFail = true): Promise<boolean> {
    this._runCount++;

    if (this.push) {
      const ok = await this.runPush();
      if (!ok) {
        this.lastPushFailures.add(new Date());
        if (retryOnFail) {
          const recentFailedPushes = this.lastPushFailures.getLatestCount(
            this.getLookBack()
          );
          // total allowed failures: numRetries and original failure
          if (recentFailedPushes < this.numRetries + 1) {
            const to = (setTimeout(() => {
              this.runTimeouts.delete(to);
              this.run();
            }, this.retryTime) as unknown) as number;
            this.runTimeouts.add(to);
          } else {
            logDebug(['sync'], 'No more push retries!');
          }
        }
        /*
          Because we assume that conflicts are solved on the server side,
          if push failed, do not attempt to pull before push was successful
          otherwise we do not know how to merge changes with the local state
        */
        return retryOnFail;
      }
    }

    if (this.pull) {
      const ok = await this.runPull();
      if (!ok) {
        this.lastPullFailures.add(new Date());
        if (retryOnFail) {
          const recentFailedPulls = this.lastPullFailures.getLatestCount(
            this.getLookBack()
          );
          if (recentFailedPulls < this.numRetries + 1) {
            const to = (setTimeout(() => {
              this.runTimeouts.delete(to);
              this.run();
            }, this.retryTime) as unknown) as number;
            this.runTimeouts.add(to);
          } else {
            logDebug(['sync'], 'No more pull retries!');
          }
        }
        return retryOnFail;
      }
    }

    return false;
  }

  throwErrorOnResponse(result: any) {
    if (result.errors) {
      if (typeof result.errors === 'string') {
        throw new Error(result.errors);
      } else if (typeof result.errors[0].message === 'string') {
        throw new Error(result.errors[0].message);
      } else {
        const err: any = new Error(
          'unknown errors occurred - see innerErrors for more details'
        );
        err.innerErrors = result.errors;
        throw err;
      }
    }
    if (result.error) {
      throw new Error(result.error);
    }
    if (result.message) {
      throw new Error(result.message);
    }
    throw new Error('unknown errors: empty data');
  }

  /**
   * @return true if successful
   */
  async runPull(): Promise<boolean> {
    if (this.isStopped()) return Promise.resolve(false);

    const latestDocuments: any[] = [];
    for (const collection of this.collections) {
      const latestDocument = await getLastPullDocument(
        collection,
        this.endpointHash
      );
      latestDocuments.push(latestDocument);
    }
    const docWithMaxTs = maxBy(latestDocuments, this.tsField);
    const latestDocumentData = docWithMaxTs ? docWithMaxTs : null;
    const pullGraphQL = await this.pull.queryBuilder(latestDocumentData);

    let result;
    try {
      result = await this.client.query(pullGraphQL.query, pullGraphQL.variables);
      if (result.errors || result.data === undefined || result.data === null) {
        this.throwErrorOnResponse(result);
      }
    } catch (err) {
      this._subjects.error.next(err);
      return false;
    }

    // this assumes that there will be always only one property in the response
    // is this correct?
    const data: DataType[] = result.data[Object.keys(result.data)[0]];
    const modified: DataType[] = (
      await Promise.all(
        data.map(async (doc: any) => await (this.pull as any).modifier(doc))
      )
    ).filter((doc) => !!doc);
    const grouped: { [key in RxCollection['name']]: DataType[] } = groupBy(
      modified,
      'itemType'
    );

    for (const collection of this.collections) {
      const collectionDataWithItemType = grouped[collection.name];
      if (!collectionDataWithItemType) continue;

      const collectionData: DataTypeWithoutItemType[] = collectionDataWithItemType.map(
        (r) => {
          const newR: DataTypeWithoutItemType = (r as unknown) as DataTypeWithoutItemType;
          delete newR['itemType'];
          return newR;
        }
      );
      /**
       * Run schema validation in dev-mode
       */
      if (overwritable.isDevMode()) {
        try {
          collectionData.forEach((doc: any) => {
            const withoutDeleteFlag = Object.assign({}, doc);
            delete withoutDeleteFlag[this.deletedFlag];
            delete withoutDeleteFlag._revisions;
            collection.schema.validate(withoutDeleteFlag);
          });
        } catch (err) {
          this._subjects.error.next(err);
          return false;
        }
      }

      const docIds = collectionData.map(
        (doc: any) => doc[collection.schema.primaryPath]
      );
      const docsWithRevisions = await getDocsWithRevisionsFromPouch(
        collection,
        docIds
      );
      await this.handleDocumentsFromRemote(
        collection,
        collectionData,
        docsWithRevisions as any
      );
      collectionData.forEach((doc: any) => this._subjects.recieved.next(doc));

      if (collectionData.length === 0) {
        //
      } else {
        const newLatestDocument = collectionData[collectionData.length - 1];
        await setLastPullDocument(collection, this.endpointHash, newLatestDocument);
      }
    }

    this._subjects.successfulPull.next(true);

    if (modified.length === 0) {
      if (this.live) {
        // console.log('no more docs, wait for ping');
      } else {
        // console.log('RxGraphQLReplicationState._run(): no more docs and not live; complete = true');
      }
    } else {
      // todo
      // const newLatestDocument = modified[modified.length - 1];
      // await setLastPullDocument(
      //   this.collection,
      //   this.endpointHash,
      //   newLatestDocument
      // );

      // we have more docs, re-run
      return await this.runPull();
    }

    return true;
  }

  /**
   * @return true if successful, false if not
   */
  async runPush(): Promise<boolean> {
    let changesCounter = 0;

    for (const collection of this.collections) {
      const changes = await getChangesSinceLastPushSequence(
        collection,
        this.endpointHash,
        this.lastPulledRevField,
        this.push.batchSize,
        this.syncRevisions
      );

      changesCounter += changes.results.length;

      const changesWithDocs: SecDataType[] = (
        await Promise.all(
          changes.results.map(async (change: any) => {
            let doc = change['doc'];

            doc[this.deletedFlag] = !!change['deleted'];
            delete doc._deleted;
            delete doc._attachments;
            delete doc[this.lastPulledRevField];

            if (!this.syncRevisions) {
              delete doc._rev;
            }

            doc = await (this.push as any).modifier(doc);
            if (!doc) {
              return null;
            }
            doc['itemType'] = collection.name;

            const seq = change.seq;
            // if (seq === undefined || seq === null) {
            //   return null;
            // }
            return {
              doc,
              seq,
            };
          })
        )
      ).filter((Boolean as any) as ExcludesNull);

      let lastSuccessfullChange = null;
      try {
        /**
         * we cannot run all queries parallel
         * because then we would not know
         * where to start again on errors
         * so we run through the docs in series
         */
        for (let i = 0; i < changesWithDocs.length; i++) {
          const changeWithDoc = changesWithDocs[i];
          const pushObj = await this.push.queryBuilder(changeWithDoc.doc);
          const result = await this.client.query(pushObj.query, pushObj.variables);
          if (result.errors || result.data === undefined || result.data === null) {
            this.throwErrorOnResponse(result);
          } else {
            this._subjects.send.next(changeWithDoc.doc);
            lastSuccessfullChange = changeWithDoc;
          }
        }
      } catch (err) {
        if (lastSuccessfullChange) {
          await setLastPushSequence(
            collection,
            this.endpointHash,
            lastSuccessfullChange.seq
          );
        }
        this._subjects.error.next(err);
        return false;
      }

      if (changes.results.length) {
        // all docs where successful, so we use the seq of the changes-fetch
        await setLastPushSequence(collection, this.endpointHash, changes.last_seq);
      }
    }

    if (changesCounter === 0) {
      if (this.live) {
        // console.log('no more docs to push, wait for ping');
      } else {
        // console.log('RxGraphQLReplicationState._runPull(): no more docs to push and not live; complete = true');
      }
    } else {
      // we have more docs, re-run
      await this.runPush();
    }

    return true;
  }

  async handleDocumentsFromRemote(
    collection: RxCollection,
    docs: any[],
    docsWithRevisions: any[]
  ) {
    const toPouchDocs = [];
    for (const doc of docs) {
      const deletedValue = doc[this.deletedFlag];
      const toPouch = collection._handleToPouch(doc);
      toPouch._deleted = deletedValue;
      delete toPouch[this.deletedFlag];

      if (!this.syncRevisions) {
        const primaryValue = toPouch._id;

        const pouchState = docsWithRevisions[primaryValue];
        let newRevision = createRevisionForPulledDocument(
          this.endpointHash,
          toPouch
        );
        if (pouchState) {
          const newRevisionHeight = pouchState.revisions.start + 1;
          const revisionId = newRevision;
          newRevision = newRevisionHeight + '-' + newRevision;
          toPouch._revisions = {
            start: newRevisionHeight,
            ids: pouchState.revisions.ids,
          };
          toPouch._revisions.ids.unshift(revisionId);
        } else {
          newRevision = '1-' + newRevision;
        }

        toPouch._rev = newRevision;
      } else {
        toPouch[this.lastPulledRevField] = toPouch._rev;
      }

      toPouchDocs.push({
        doc: toPouch,
        deletedValue,
      });
    }
    const startTime = now();
    await collection.pouch.bulkDocs(
      toPouchDocs.map((tpd) => tpd.doc),
      {
        new_edits: false,
      }
    );
    const endTime = now();

    /**
     * because bulkDocs with new_edits: false
     * does not stream changes to the pouchdb,
     * we create the event and emit it,
     * so other instances get informed about it
     */
    for (const tpd of toPouchDocs) {
      const originalDoc = flatClone(tpd.doc);

      if (tpd.deletedValue) {
        originalDoc._deleted = tpd.deletedValue;
      } else {
        delete originalDoc._deleted;
      }
      delete originalDoc[this.deletedFlag];
      delete originalDoc._revisions;

      const cE = changeEventfromPouchChange(
        originalDoc,
        collection,
        startTime,
        endTime
      );
      collection.$emit(cE);
    }
  }

  cancel(): Promise<any> {
    if (this.isStopped()) return Promise.resolve(false);
    this.runTimeouts.forEach((t) => clearTimeout(t));
    this._subs.forEach((sub) => sub.unsubscribe());
    this._subjects.canceled.next(true);
    return Promise.resolve(true);
  }

  setHeaders(headers: { [k: string]: string }): void {
    this.client = GraphQLClient({
      url: this.url,
      headers,
    });
  }
}

export type SyncOptionsGraphQL = {
  collectionNames?: string[];
  url: string;
  headers?: { [k: string]: string }; // send with all requests to the endpoint
  waitForLeadership?: boolean; // default=true
  pull: GraphQLSyncPullOptions;
  push: GraphQLSyncPushOptions;
  deletedFlag: string;
  tsField: string;
  live?: boolean; // default=false
  liveInterval?: number; // time in ms
  retryTime?: number; // time in ms
  numRetries?: number; // how many times to retry before giving up
  autoStart?: boolean; // if this is false, the replication does nothing at start
  syncRevisions?: boolean;
  lastPulledRevField?: string;
};

export function syncGraphQL(
  this: RxDatabase,
  {
    url,
    collectionNames,
    headers = {},
    waitForLeadership = true,
    pull,
    push,
    deletedFlag,
    tsField,
    lastPulledRevField = 'last_pulled_rev',
    live = false,
    liveInterval = 1000 * 10, // in ms
    retryTime = 1000 * 4, // in ms
    numRetries = 4, // how many times to retry before giving up
    autoStart = true, // if this is false, the replication does nothing at start
    syncRevisions = false,
  }: SyncOptionsGraphQL
): RxGraphQLReplicationState {
  // eslint-disable-next-line @typescript-eslint/no-this-alias
  const db = this;

  // fill in defaults for pull & push
  if (pull) {
    if (!pull.modifier) pull.modifier = DEFAULT_MODIFIER;
  }
  if (push) {
    if (!push.modifier) push.modifier = DEFAULT_MODIFIER;
  }

  const collections: RxCollection[] = ((!collectionNames
    ? Object.values(db.collections)
    : collectionNames.map(
        (key) => db.collections[key]
      )) as unknown) as RxCollection[];

  for (const collection of collections) {
    // ensure the collection is listening to plain-pouchdb writes
    collection.watchForChanges();
  }

  const replicationState = new RxGraphQLReplicationState(
    collections,
    url,
    headers,
    pull,
    push,
    deletedFlag,
    lastPulledRevField,
    tsField,
    live,
    liveInterval,
    retryTime,
    numRetries,
    syncRevisions
  );

  if (!autoStart) return replicationState;

  // run internal so .sync() does not have to be async
  const waitTillRun: any = waitForLeadership
    ? db.waitForLeadership()
    : promiseWait(0);
  waitTillRun.then(() => {
    // trigger run once
    replicationState.run();

    // start sync-interval
    if (replicationState.live) {
      if (pull) {
        (async () => {
          while (!replicationState.isStopped()) {
            await promiseWait(replicationState.liveInterval);
            if (replicationState.isStopped()) return;
            await replicationState.run(
              // do not retry on liveInterval-runs because they might stack up
              // when failing
              false
            );
          }
        })();
      }

      if (push) {
        /**
         * we have to use the rxdb changestream
         * because the pouchdb.changes stream sometimes
         * does not emit events or stucks
         */

        for (const collection of collections) {
          const changeEventsSub = collection.$.subscribe((changeEvent) => {
            if (replicationState.isStopped()) return;
            const rev = changeEvent.documentData._rev;
            if (
              rev &&
              !wasRevisionfromPullReplication(
                replicationState.endpointHash,
                changeEvent.documentData,
                lastPulledRevField
              )
            ) {
              replicationState.run();
            }
          });
          replicationState._subs.push(changeEventsSub);
        }
      }
    }
  });

  return replicationState;
}

export * from './helper';
export * from './crawling-checkpoint';
export * from './graphql-schema-from-rx-schema';
export * from './query-builder-from-rx-schema';

export const rxdb = true;
export const prototypes = {
  // RxCollection: (proto: any) => {
  //   proto.syncGraphQL = syncGraphQL;
  // },
  RxDatabase: (proto: any) => {
    proto.syncGraphQL = syncGraphQL;
  },
};

export const RxDBReplicationGraphQLPlugin: RxPlugin = {
  name: 'replication-graphql',
  rxdb,
  prototypes,
};
