# Copyright 2018 The HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Configuration base class and utilities."""

import os
from dataclasses import dataclass
from pathlib import Path
from typing import Any

import httpx
import yaml
from huggingface_hub import is_offline_mode, model_info
from huggingface_hub.errors import OfflineModeIsEnabled
from huggingface_hub.utils import HFValidationError

from . import __version__
from .models.auto.modeling_auto import (
    MODEL_FOR_AUDIO_CLASSIFICATION_MAPPING_NAMES,
    MODEL_FOR_CAUSAL_LM_MAPPING_NAMES,
    MODEL_FOR_CTC_MAPPING_NAMES,
    MODEL_FOR_IMAGE_CLASSIFICATION_MAPPING_NAMES,
    MODEL_FOR_IMAGE_SEGMENTATION_MAPPING_NAMES,
    MODEL_FOR_IMAGE_TEXT_TO_TEXT_MAPPING_NAMES,
    MODEL_FOR_MASKED_LM_MAPPING_NAMES,
    MODEL_FOR_OBJECT_DETECTION_MAPPING_NAMES,
    MODEL_FOR_QUESTION_ANSWERING_MAPPING_NAMES,
    MODEL_FOR_SEQ_TO_SEQ_CAUSAL_LM_MAPPING_NAMES,
    MODEL_FOR_SEQUENCE_CLASSIFICATION_MAPPING_NAMES,
    MODEL_FOR_SPEECH_SEQ_2_SEQ_MAPPING_NAMES,
    MODEL_FOR_TABLE_QUESTION_ANSWERING_MAPPING_NAMES,
    MODEL_FOR_TOKEN_CLASSIFICATION_MAPPING_NAMES,
    MODEL_FOR_ZERO_SHOT_IMAGE_CLASSIFICATION_MAPPING_NAMES,
)
from .training_args import ParallelMode
from .utils import (
    is_datasets_available,
    is_tokenizers_available,
    is_torch_available,
    logging,
)


TASK_MAPPING = {
    "text-generation": MODEL_FOR_CAUSAL_LM_MAPPING_NAMES,
    "image-classification": MODEL_FOR_IMAGE_CLASSIFICATION_MAPPING_NAMES,
    "image-segmentation": MODEL_FOR_IMAGE_SEGMENTATION_MAPPING_NAMES,
    "fill-mask": MODEL_FOR_MASKED_LM_MAPPING_NAMES,
    "object-detection": MODEL_FOR_OBJECT_DETECTION_MAPPING_NAMES,
    "question-answering": MODEL_FOR_QUESTION_ANSWERING_MAPPING_NAMES,
    "text2text-generation": MODEL_FOR_SEQ_TO_SEQ_CAUSAL_LM_MAPPING_NAMES,
    "text-classification": MODEL_FOR_SEQUENCE_CLASSIFICATION_MAPPING_NAMES,
    "table-question-answering": MODEL_FOR_TABLE_QUESTION_ANSWERING_MAPPING_NAMES,
    "token-classification": MODEL_FOR_TOKEN_CLASSIFICATION_MAPPING_NAMES,
    "audio-classification": MODEL_FOR_AUDIO_CLASSIFICATION_MAPPING_NAMES,
    "automatic-speech-recognition": {**MODEL_FOR_CTC_MAPPING_NAMES, **MODEL_FOR_SPEECH_SEQ_2_SEQ_MAPPING_NAMES},
    "zero-shot-image-classification": MODEL_FOR_ZERO_SHOT_IMAGE_CLASSIFICATION_MAPPING_NAMES,
    "image-text-to-text": MODEL_FOR_IMAGE_TEXT_TO_TEXT_MAPPING_NAMES,
}

logger = logging.get_logger(__name__)


AUTOGENERATED_TRAINER_COMMENT = """
<!-- This model card has been generated automatically according to the information the Trainer had access to. You
should probably proofread and complete it, then remove this comment. -->
"""


TASK_TAG_TO_NAME_MAPPING = {
    "fill-mask": "Masked Language Modeling",
    "image-classification": "Image Classification",
    "image-segmentation": "Image Segmentation",
    "multiple-choice": "Multiple Choice",
    "object-detection": "Object Detection",
    "question-answering": "Question Answering",
    "summarization": "Summarization",
    "table-question-answering": "Table Question Answering",
    "text-classification": "Text Classification",
    "text-generation": "Causal Language Modeling",
    "text2text-generation": "Sequence-to-sequence Language Modeling",
    "token-classification": "Token Classification",
    "translation": "Translation",
    "zero-shot-classification": "Zero Shot Classification",
    "automatic-speech-recognition": "Automatic Speech Recognition",
    "audio-classification": "Audio Classification",
}


METRIC_TAGS = [
    "accuracy",
    "bleu",
    "f1",
    "matthews_correlation",
    "pearsonr",
    "precision",
    "recall",
    "rouge",
    "sacrebleu",
    "spearmanr",
    "wer",
]


def _listify(obj):
    if obj is None:
        return []
    elif isinstance(obj, str):
        return [obj]
    else:
        return obj


def _insert_values_as_list(metadata, name, values):
    if values is None:
        return metadata
    if isinstance(values, str):
        values = [values]
    values = [v for v in values if v is not None]
    if len(values) == 0:
        return metadata
    metadata[name] = values
    return metadata


def infer_metric_tags_from_eval_results(eval_results):
    if eval_results is None:
        return {}
    result = {}
    for key in eval_results:
        if key.lower().replace(" ", "_") in METRIC_TAGS:
            result[key.lower().replace(" ", "_")] = key
        elif key.lower() == "rouge1":
            result["rouge"] = key
    return result


def _insert_value(metadata, name, value):
    if value is None:
        return metadata
    metadata[name] = value
    return metadata


def is_hf_dataset(dataset):
    if not is_datasets_available():
        return False

    from datasets import Dataset, IterableDataset

    return isinstance(dataset, (Dataset, IterableDataset))


def _get_mapping_values(mapping):
    result = []
    for v in mapping.values():
        if isinstance(v, (tuple, list)):
            result += list(v)
        else:
            result.append(v)
    return result


@dataclass
class TrainingSummary:
    model_name: str
    language: str | list[str] | None = None
    license: str | None = None
    tags: str | list[str] | None = None
    finetuned_from: str | None = None
    tasks: str | list[str] | None = None
    dataset: str | list[str] | None = None
    dataset_tags: str | list[str] | None = None
    dataset_args: str | list[str] | None = None
    dataset_metadata: dict[str, Any] | None = None
    eval_results: dict[str, float] | None = None
    eval_lines: list[str] | None = None
    hyperparameters: dict[str, Any] | None = None
    source: str | None = "trainer"

    def __post_init__(self):
        # Infer default license from the checkpoint used, if possible.
        if (
            self.license is None
            and not is_offline_mode()
            and self.finetuned_from is not None
            and len(self.finetuned_from) > 0
        ):
            try:
                info = model_info(self.finetuned_from)
                for tag in info.tags:
                    if tag.startswith("license:"):
                        self.license = tag[8:]
            except (httpx.HTTPError, HFValidationError, OfflineModeIsEnabled):
                pass

    def create_model_index(self, metric_mapping):
        model_index = {"name": self.model_name}

        # Dataset mapping tag -> name
        dataset_names = _listify(self.dataset)
        dataset_tags = _listify(self.dataset_tags)
        dataset_args = _listify(self.dataset_args)
        dataset_metadata = _listify(self.dataset_metadata)
        if len(dataset_args) < len(dataset_tags):
            dataset_args = dataset_args + [None] * (len(dataset_tags) - len(dataset_args))
        dataset_mapping = dict(zip(dataset_tags, dataset_names))
        dataset_arg_mapping = dict(zip(dataset_tags, dataset_args))
        dataset_metadata_mapping = dict(zip(dataset_tags, dataset_metadata))

        task_mapping = {
            task: TASK_TAG_TO_NAME_MAPPING[task] for task in _listify(self.tasks) if task in TASK_TAG_TO_NAME_MAPPING
        }

        model_index["results"] = []

        if len(task_mapping) == 0 and len(dataset_mapping) == 0:
            return [model_index]
        if len(task_mapping) == 0:
            task_mapping = {None: None}
        if len(dataset_mapping) == 0:
            dataset_mapping = {None: None}

        # One entry per dataset and per task
        all_possibilities = [(task_tag, ds_tag) for task_tag in task_mapping for ds_tag in dataset_mapping]
        for task_tag, ds_tag in all_possibilities:
            result = {}
            if task_tag is not None:
                result["task"] = {"name": task_mapping[task_tag], "type": task_tag}

            if ds_tag is not None:
                metadata = dataset_metadata_mapping.get(ds_tag, {})
                result["dataset"] = {
                    "name": dataset_mapping[ds_tag],
                    "type": ds_tag,
                    **metadata,
                }
                if dataset_arg_mapping[ds_tag] is not None:
                    result["dataset"]["args"] = dataset_arg_mapping[ds_tag]

            if len(metric_mapping) > 0:
                result["metrics"] = []
                for metric_tag, metric_name in metric_mapping.items():
                    result["metrics"].append(
                        {
                            "name": metric_name,
                            "type": metric_tag,
                            "value": self.eval_results[metric_name],
                        }
                    )

            # Remove partial results to avoid the model card being rejected.
            if "task" in result and "dataset" in result and "metrics" in result:
                model_index["results"].append(result)
            else:
                logger.info(f"Dropping the following result as it does not have all the necessary fields:\n{result}")

        return [model_index]

    def create_metadata(self):
        metric_mapping = infer_metric_tags_from_eval_results(self.eval_results)

        metadata = {}
        metadata = _insert_value(metadata, "library_name", "transformers")
        metadata = _insert_values_as_list(metadata, "language", self.language)
        metadata = _insert_value(metadata, "license", self.license)
        if self.finetuned_from is not None and isinstance(self.finetuned_from, str) and len(self.finetuned_from) > 0:
            metadata = _insert_value(metadata, "base_model", self.finetuned_from)
        metadata = _insert_values_as_list(metadata, "tags", self.tags)
        metadata = _insert_values_as_list(metadata, "datasets", self.dataset_tags)
        metadata = _insert_values_as_list(metadata, "metrics", list(metric_mapping.keys()))
        metadata["model-index"] = self.create_model_index(metric_mapping)

        return metadata

    def to_model_card(self):
        model_card = ""

        metadata = yaml.dump(self.create_metadata(), sort_keys=False)
        if len(metadata) > 0:
            model_card = f"---\n{metadata}---\n"

        # Now the model card for realsies.
        if self.source == "trainer":
            model_card += AUTOGENERATED_TRAINER_COMMENT

        model_card += f"\n# {self.model_name}\n\n"

        if self.finetuned_from is None:
            model_card += "This model was trained from scratch on "
        else:
            model_card += (
                "This model is a fine-tuned version of"
                f" [{self.finetuned_from}](https://huggingface.co/{self.finetuned_from}) on "
            )

        if self.dataset is None or (isinstance(self.dataset, list) and len(self.dataset) == 0):
            model_card += "an unknown dataset."
        else:
            if isinstance(self.dataset, str):
                model_card += f"the {self.dataset} dataset."
            elif isinstance(self.dataset, (tuple, list)) and len(self.dataset) == 1:
                model_card += f"the {self.dataset[0]} dataset."
            else:
                model_card += (
                    ", ".join([f"the {ds}" for ds in self.dataset[:-1]]) + f" and the {self.dataset[-1]} datasets."
                )

        if self.eval_results is not None:
            model_card += "\nIt achieves the following results on the evaluation set:\n"
            model_card += "\n".join([f"- {name}: {_maybe_round(value)}" for name, value in self.eval_results.items()])
        model_card += "\n"

        model_card += "\n## Model description\n\nMore information needed\n"
        model_card += "\n## Intended uses & limitations\n\nMore information needed\n"
        model_card += "\n## Training and evaluation data\n\nMore information needed\n"

        model_card += "\n## Training procedure\n"
        model_card += "\n### Training hyperparameters\n"
        if self.hyperparameters is not None:
            model_card += "\nThe following hyperparameters were used during training:\n"
            model_card += "\n".join([f"- {name}: {value}" for name, value in self.hyperparameters.items()])
            model_card += "\n"
        else:
            model_card += "\nMore information needed\n"

        if self.eval_lines is not None:
            model_card += "\n### Training results\n\n"
            model_card += make_markdown_table(self.eval_lines)
            model_card += "\n"

        model_card += "\n### Framework versions\n\n"
        model_card += f"- Transformers {__version__}\n"

        if self.source == "trainer" and is_torch_available():
            import torch

            model_card += f"- Pytorch {torch.__version__}\n"
        if is_datasets_available():
            import datasets

            model_card += f"- Datasets {datasets.__version__}\n"
        if is_tokenizers_available():
            import tokenizers

            model_card += f"- Tokenizers {tokenizers.__version__}\n"

        return model_card

    @classmethod
    def from_trainer(
        cls,
        trainer,
        language=None,
        license=None,
        tags=None,
        model_name=None,
        finetuned_from=None,
        tasks=None,
        dataset_tags=None,
        dataset_metadata=None,
        dataset=None,
        dataset_args=None,
    ):
        # Infer default from dataset
        one_dataset = trainer.eval_dataset if trainer.eval_dataset is not None else trainer.train_dataset
        if is_hf_dataset(one_dataset) and (dataset_tags is None or dataset_args is None or dataset_metadata is None):
            default_tag = one_dataset.builder_name
            # Those are not real datasets from the Hub so we exclude them.
            if default_tag not in ["csv", "json", "pandas", "parquet", "text"]:
                if dataset_metadata is None:
                    dataset_metadata = [{"config": one_dataset.config_name, "split": str(one_dataset.split)}]
                if dataset_tags is None:
                    dataset_tags = [default_tag]
                if dataset_args is None:
                    dataset_args = [one_dataset.config_name]

        if dataset is None and dataset_tags is not None:
            dataset = dataset_tags

        # Infer default finetuned_from
        if (
            finetuned_from is None
            and hasattr(trainer.model.config, "_name_or_path")
            and not os.path.isdir(trainer.model.config._name_or_path)
        ):
            finetuned_from = trainer.model.config._name_or_path

        # Infer default task tag:
        if tasks is None:
            model_class_name = trainer.model.__class__.__name__
            for task, mapping in TASK_MAPPING.items():
                if model_class_name in _get_mapping_values(mapping):
                    tasks = task

        if model_name is None:
            model_name = Path(trainer.args.output_dir).name
        if len(model_name) == 0:
            model_name = finetuned_from

        # Add `generated_from_trainer` to the tags
        if tags is None:
            tags = ["generated_from_trainer"]
        elif isinstance(tags, str) and tags != "generated_from_trainer":
            tags = [tags, "generated_from_trainer"]
        elif "generated_from_trainer" not in tags:
            tags.append("generated_from_trainer")

        _, eval_lines, eval_results = parse_log_history(trainer.state.log_history)
        hyperparameters = extract_hyperparameters_from_trainer(trainer)

        return cls(
            language=language,
            license=license,
            tags=tags,
            model_name=model_name,
            finetuned_from=finetuned_from,
            tasks=tasks,
            dataset=dataset,
            dataset_tags=dataset_tags,
            dataset_args=dataset_args,
            dataset_metadata=dataset_metadata,
            eval_results=eval_results,
            eval_lines=eval_lines,
            hyperparameters=hyperparameters,
        )


def parse_log_history(log_history):
    """
    Parse the `log_history` of a Trainer to get the intermediate and final evaluation results.
    """
    idx = 0
    while idx < len(log_history) and "train_runtime" not in log_history[idx]:
        idx += 1

    # If there are no training logs
    if idx == len(log_history):
        idx -= 1
        while idx >= 0 and "eval_loss" not in log_history[idx]:
            idx -= 1

        if idx >= 0:
            return None, None, log_history[idx]
        else:
            return None, None, None

    # From now one we can assume we have training logs:
    train_log = log_history[idx]
    lines = []
    training_loss = "No log"
    for i in range(idx):
        if "loss" in log_history[i]:
            training_loss = log_history[i]["loss"]
        if "eval_loss" in log_history[i]:
            metrics = log_history[i].copy()
            _ = metrics.pop("total_flos", None)
            epoch = metrics.pop("epoch", None)
            step = metrics.pop("step", None)
            _ = metrics.pop("eval_runtime", None)
            _ = metrics.pop("eval_samples_per_second", None)
            _ = metrics.pop("eval_steps_per_second", None)
            values = {"Training Loss": training_loss, "Epoch": epoch, "Step": step}
            for k, v in metrics.items():
                if k == "eval_loss":
                    values["Validation Loss"] = v
                else:
                    splits = k.split("_")
                    name = " ".join([part.capitalize() for part in splits[1:]])
                    values[name] = v
            lines.append(values)

    idx = len(log_history) - 1
    while idx >= 0 and "eval_loss" not in log_history[idx]:
        idx -= 1

    if idx > 0:
        eval_results = {}
        for key, value in log_history[idx].items():
            key = key.removeprefix("eval_")
            if key not in ["runtime", "samples_per_second", "steps_per_second", "epoch", "step"]:
                camel_cased_key = " ".join([part.capitalize() for part in key.split("_")])
                eval_results[camel_cased_key] = value
        return train_log, lines, eval_results
    else:
        return train_log, lines, None


def _maybe_round(v, decimals=4):
    if isinstance(v, float) and len(str(v).split(".")) > 1 and len(str(v).split(".")[1]) > decimals:
        return f"{v:.{decimals}f}"
    return str(v)


def _regular_table_line(values, col_widths):
    values_with_space = [f"| {v}" + " " * (w - len(v) + 1) for v, w in zip(values, col_widths)]
    return "".join(values_with_space) + "|\n"


def _second_table_line(col_widths):
    values = ["|:" + "-" * w + ":" for w in col_widths]
    return "".join(values) + "|\n"


def make_markdown_table(lines):
    """
    Create a nice Markdown table from the results in `lines`.
    """
    if lines is None or len(lines) == 0:
        return ""
    col_widths = {key: len(str(key)) for key in lines[0]}
    for line in lines:
        for key, value in line.items():
            if col_widths[key] < len(_maybe_round(value)):
                col_widths[key] = len(_maybe_round(value))

    table = _regular_table_line(list(lines[0].keys()), list(col_widths.values()))
    table += _second_table_line(list(col_widths.values()))
    for line in lines:
        table += _regular_table_line([_maybe_round(v) for v in line.values()], list(col_widths.values()))
    return table


_TRAINING_ARGS_KEYS = [
    "learning_rate",
    "train_batch_size",
    "eval_batch_size",
    "seed",
]


def extract_hyperparameters_from_trainer(trainer):
    hyperparameters = {k: getattr(trainer.args, k) for k in _TRAINING_ARGS_KEYS}

    if trainer.args.parallel_mode not in [ParallelMode.NOT_PARALLEL, ParallelMode.NOT_DISTRIBUTED]:
        hyperparameters["distributed_type"] = (
            "multi-GPU" if trainer.args.parallel_mode == ParallelMode.DISTRIBUTED else trainer.args.parallel_mode.value
        )
    if trainer.args.world_size > 1:
        hyperparameters["num_devices"] = trainer.args.world_size
    if trainer.args.gradient_accumulation_steps > 1:
        hyperparameters["gradient_accumulation_steps"] = trainer.args.gradient_accumulation_steps

    total_train_batch_size = (
        trainer.args.train_batch_size * trainer.args.world_size * trainer.args.gradient_accumulation_steps
    )
    if total_train_batch_size != hyperparameters["train_batch_size"]:
        hyperparameters["total_train_batch_size"] = total_train_batch_size
    total_eval_batch_size = trainer.args.eval_batch_size * trainer.args.world_size
    if total_eval_batch_size != hyperparameters["eval_batch_size"]:
        hyperparameters["total_eval_batch_size"] = total_eval_batch_size

    if trainer.args.optim:
        optimizer_name = trainer.args.optim
        optimizer_args = trainer.args.optim_args if trainer.args.optim_args else "No additional optimizer arguments"

        if "adam" in optimizer_name.lower():
            hyperparameters["optimizer"] = (
                f"Use {optimizer_name} with betas=({trainer.args.adam_beta1},{trainer.args.adam_beta2}) and"
                f" epsilon={trainer.args.adam_epsilon} and optimizer_args={optimizer_args}"
            )
        else:
            hyperparameters["optimizer"] = f"Use {optimizer_name} and the args are:\n{optimizer_args}"

    hyperparameters["lr_scheduler_type"] = trainer.args.lr_scheduler_type.value
    if trainer.args.warmup_steps != 0.0:
        hyperparameters["lr_scheduler_warmup_steps"] = trainer.args.warmup_steps
    if trainer.args.max_steps != -1:
        hyperparameters["training_steps"] = trainer.args.max_steps
    else:
        hyperparameters["num_epochs"] = trainer.args.num_train_epochs

    if trainer.args.fp16:
        hyperparameters["mixed_precision_training"] = "Native AMP"

    if trainer.args.label_smoothing_factor != 0.0:
        hyperparameters["label_smoothing_factor"] = trainer.args.label_smoothing_factor

    return hyperparameters
