from abc import abstractmethod
from datetime import date, datetime, timedelta
from typing import Any, Optional, Sequence
from uuid import UUID
from pydantic import BaseModel
import enlyze.models as user_models
[docs]
class Site(PlatformApiModel):
uuid: UUID
name: str
[docs]
def to_user_model(self) -> user_models.Site:
"""Convert into a :ref:`user model <user_models>`"""
return user_models.Site(
uuid=self.uuid,
display_name=self.name,
)
[docs]
class Machine(PlatformApiModel):
name: str
uuid: UUID
genesis_date: date
site: UUID
[docs]
def to_user_model(self, site: user_models.Site) -> user_models.Machine:
"""Convert into a :ref:`user model <user_models>`"""
return user_models.Machine(
uuid=self.uuid,
display_name=self.name,
genesis_date=self.genesis_date,
site=site,
)
[docs]
class Variable(PlatformApiModel):
uuid: UUID
display_name: Optional[str]
unit: Optional[str]
data_type: user_models.VariableDataType
[docs]
def to_user_model(self, machine: user_models.Machine) -> user_models.Variable:
"""Convert into a :ref:`user model <user_models>`."""
return user_models.Variable(
uuid=self.uuid,
display_name=self.display_name,
unit=self.unit,
data_type=self.data_type,
machine=machine,
)
[docs]
class TimeseriesData(PlatformApiModel):
columns: list[str]
records: list[Any]
[docs]
def extend(self, other: "TimeseriesData") -> None:
"""Add records from ``other`` after the existing records."""
self.records.extend(other.records)
[docs]
def merge(self, other: "TimeseriesData") -> "TimeseriesData":
"""Merge records from ``other`` into the existing records."""
slen, olen = len(self.records), len(other.records)
if olen < slen:
raise ValueError(
"Cannot merge. Attempted to merge"
f" an instance with {olen} records into an instance with {slen}"
" records. The instance to merge must have a number"
" of records greater than or equal to the number of records of"
" the instance you're trying to merge into."
)
self.columns.extend(other.columns[1:])
for s, o in zip(self.records, other.records[:slen]):
if s[0] != o[0]:
raise ValueError(
"Cannot merge. Attempted to merge records "
f"with mismatched timestamps {s[0]}, {o[0]}"
)
s.extend(o[1:])
return self
[docs]
def to_user_model(
self,
start: datetime,
end: datetime,
variables: Sequence[user_models.Variable],
) -> user_models.TimeseriesData:
return user_models.TimeseriesData(
start=start,
end=end,
variables=variables,
_columns=self.columns,
_records=self.records,
)
[docs]
class OEEComponent(PlatformApiModel):
score: float
time_loss: int
[docs]
def to_user_model(self) -> user_models.OEEComponent:
"""Convert into a :ref:`user model <user_models>`"""
return user_models.OEEComponent(
score=self.score,
time_loss=timedelta(seconds=self.time_loss),
)
[docs]
class Product(PlatformApiModel):
uuid: UUID
external_id: str
name: Optional[str]
[docs]
def to_user_model(self) -> user_models.Product:
"""Convert into a :ref:`user model <user_models>`"""
return user_models.Product(
uuid=self.uuid,
external_id=self.external_id,
name=self.name,
)
[docs]
class Quantity(PlatformApiModel):
unit: str | None
value: float
[docs]
def to_user_model(self) -> user_models.Quantity:
"""Convert into a :ref:`user model <user_models>`"""
return user_models.Quantity(
unit=self.unit,
value=self.value,
)
[docs]
class ProductionRun(PlatformApiModel):
uuid: UUID
machine: UUID
average_throughput: Optional[float]
production_order: str
product: UUID
start: datetime
end: Optional[datetime]
quantity_total: Optional[Quantity]
quantity_scrap: Optional[Quantity]
quantity_yield: Optional[Quantity]
availability: Optional[OEEComponent]
performance: Optional[OEEComponent]
quality: Optional[OEEComponent]
productivity: Optional[OEEComponent]
[docs]
def to_user_model(
self,
machines_by_uuid: dict[UUID, user_models.Machine],
products_by_uuid: dict[UUID, user_models.Product],
) -> user_models.ProductionRun:
"""Convert into a :ref:`user model <user_models>`"""
quantity_total = (
self.quantity_total.to_user_model() if self.quantity_total else None
)
quantity_scrap = (
self.quantity_scrap.to_user_model() if self.quantity_scrap else None
)
quantity_yield = (
self.quantity_yield.to_user_model() if self.quantity_yield else None
)
availability = self.availability.to_user_model() if self.availability else None
performance = self.performance.to_user_model() if self.performance else None
quality = self.quality.to_user_model() if self.quality else None
productivity = self.productivity.to_user_model() if self.productivity else None
return user_models.ProductionRun(
uuid=self.uuid,
machine=machines_by_uuid[self.machine],
average_throughput=self.average_throughput,
production_order=self.production_order,
product=products_by_uuid[self.product],
start=self.start,
end=self.end,
quantity_total=quantity_total,
quantity_scrap=quantity_scrap,
quantity_yield=quantity_yield,
availability=availability,
performance=performance,
quality=quality,
productivity=productivity,
)