Skip to content

mm.Context

Multimodal context backed by a Rust core.

When constructed without a root (Context() / Context(session_id=...)) the context is incremental: use :meth:add to attach free-form strings, pathlib.Path objects, or PIL.Image.Image instances under a chat role, then :meth:to_messages to hand the whole context to a VLM.

When constructed with a root (Context("~/data")) it scans the directory and exposes the Arrow-backed metadata table (legacy mode). Both modes share the same session_id / refs surface.

Parameters:

Name Type Description Default
root str | Path | None

Directory to scan. If provided, switches to directory-scan mode.

None
n_threads int | None

Optional thread count for the Rust scanner (scan mode only).

None
no_ignore bool

If True, ignore .gitignore exclusions (scan mode only).

False
session_id str | None

External session id. Auto-minted as a UUIDv7 when omitted and root is not provided. Stays None when root is provided without session_id (preserves the legacy default).

None
llm_base_url str | None

Optional LLM overrides for base_url.

None
llm_api_key str | None

Optional LLM overrides for api_key.

None

Examples:

Incremental (role-aware)::

ctx = Context()
prompt = ctx.add("Describe this image.", role="user")
ref = ctx.add(Path("photo.jpg"), role="user", metadata={"note": "hero shot"})
messages = ctx.to_messages(format="openai")

Directory-scan (legacy)::

ctx = Context("~/data", session_id="my-session")
ctx.save()
row = Context.get("my-session/" + ctx.ref_for("photo.jpg"))
Source code in mm/context.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
class Context:
    """Multimodal context backed by a Rust core.

    When constructed **without a** ``root`` (``Context()`` / ``Context(session_id=...)``)
    the context is *incremental*: use :meth:`add` to attach free-form
    strings, ``pathlib.Path`` objects, or ``PIL.Image.Image`` instances
    under a chat role, then :meth:`to_messages` to hand the whole
    context to a VLM.

    When constructed **with a** ``root`` (``Context("~/data")``) it scans
    the directory and exposes the Arrow-backed metadata table (legacy
    mode). Both modes share the same ``session_id`` / ``refs`` surface.

    Args:
        root: Directory to scan. If provided, switches to directory-scan mode.
        n_threads: Optional thread count for the Rust scanner (scan mode only).
        no_ignore: If True, ignore ``.gitignore`` exclusions (scan mode only).
        session_id: External session id. Auto-minted as a UUIDv7 when
            omitted *and* ``root`` is not provided. Stays ``None`` when
            ``root`` is provided without ``session_id`` (preserves the
            legacy default).
        llm_base_url: Optional LLM overrides for base_url.
        llm_api_key: Optional LLM overrides for api_key.

    Examples:
        Incremental (role-aware)::

            ctx = Context()
            prompt = ctx.add("Describe this image.", role="user")
            ref = ctx.add(Path("photo.jpg"), role="user", metadata={"note": "hero shot"})
            messages = ctx.to_messages(format="openai")

        Directory-scan (legacy)::

            ctx = Context("~/data", session_id="my-session")
            ctx.save()
            row = Context.get("my-session/" + ctx.ref_for("photo.jpg"))
    """

    def __init__(
        self,
        root: str | Path | None = None,
        *,
        n_threads: int | None = None,
        no_ignore: bool = False,
        session_id: str | None = None,
        llm_base_url: str | None = None,
        llm_api_key: str | None = None,
    ):
        self._llm_base_url = llm_base_url
        self._llm_api_key = llm_api_key
        self._db: MmDatabase | None = None

        if root is None:
            # Incremental role-aware mode.
            from mm._mm import PyContext

            sid = session_id if session_id is not None else uuid7()
            self._pyctx: Any = PyContext(session_id=sid)
            self.root: Path | None = None
            self._session_id = sid
            self._no_ignore = False
            self._scanner: Any = None
            self._table: Any = None
            self._refs_cache: dict[str, str] | None = None
        else:
            # Legacy directory-scan mode.
            from mm._mm import Scanner

            self.root = Path(root).resolve()
            self._no_ignore = no_ignore
            self._session_id = session_id
            self._pyctx = None
            self._scanner = Scanner(str(self.root), n_threads, no_ignore=no_ignore)
            self._scanner.scan()
            self._table = self._scanner.to_arrow()
            self._refs_cache: dict[str, str] | None = None

    # ── Shared properties ─────────────────────────────────────────────

    @classmethod
    def new_session(
        cls,
        root: str | Path | None = None,
        **kwargs: Any,
    ) -> Context:
        """Create a Context with a freshly minted session id.

        Convenience for callers who want refs but don't have a session
        id from an upstream system. Legacy signature uses UUIDv4 for
        compatibility; new incremental usage uses UUIDv7 via :func:`uuid7`.
        """
        if root is None:
            return cls(session_id=uuid7(), **kwargs)
        return cls(root, session_id=new_session_id(), **kwargs)

    @property
    def session_id(self) -> str | None:
        """External session id attached to this context, if any."""
        return self._session_id

    @property
    def db(self) -> MmDatabase:
        """Lazy-initialized global database connection."""
        if self._db is None:
            self._db = MmDatabase()
        return self._db

    @property
    def num_files(self) -> int:
        if self._pyctx is not None:
            return self._pyctx.num_items()
        return int(self._table.num_rows)

    # ── Incremental API (role-aware) ──────────────────────────────────

    def add(
        self,
        obj: str | Path | "PILImage.Image",
        *,
        role: _RoleLiteral = "user",
        metadata: dict[str, Any] | None = None,
    ) -> Ref:
        """Attach an item to the context and return its kind-prefixed ref id.

        Accepts:
            - ``str`` — free-form text, inlined under any supported role.
            - :class:`pathlib.Path` pointing to an existing file (``role="user"`` only).
            - :class:`PIL.Image.Image` — held in-memory (``role="user"`` only).

        Args:
            obj: Free-form text, a ``pathlib.Path``, or ``PIL.Image.Image``.
            role: Chat role for this item: ``"system"``, ``"developer"``, or ``"user"``.
            metadata: Optional JSON-serialisable ``dict`` of extra
                context for this item (e.g. ``{"note": "hero shot"}``,
                ``{"summary": "Attention is all you need", "tags":
                ["nlp"]}``). Stable key order is preserved. The
                metadata flows into :meth:`__repr__`, :meth:`to_md`,
                :meth:`print_tree`, and :meth:`to_messages` (emitted as
                a leading text block per item so the VLM sees it
                inline).

        Returns:
            The generated ref id (``<prefix>_<6 hex>``). The return type
            is :data:`mm.refs.Ref` — a typed alias over ``str``.

        Raises:
            RuntimeError: If called on a directory-scan Context.
            ValueError: If ``role`` is invalid or a media object is added
                to a non-user role.
            FileNotFoundError: If ``obj`` is a Path that doesn't exist.
            TypeError: If ``obj`` is not a str, Path, or PIL.Image.Image.
        """
        self._require_pyctx("add")
        role = _validate_role(role)
        kind, source_kind, source_value, byte_len, desc, py_obj = _classify_add_obj(obj)
        if role != "user" and kind != "text":
            raise ValueError("Only free-form string text can use role='system' or role='developer'")
        metadata_json = json.dumps(metadata) if metadata else None

        ref_id: str = self._pyctx.add(
            role,
            kind,
            source_kind,
            source_value,
            byte_len=byte_len,
            desc=desc,
            py_obj=py_obj,
            metadata_json=metadata_json,
        )
        return ref_id

    def remove(self, ref_id: str) -> None:
        """Remove an item from an incremental context by ref id.

        Args:
            ref_id: Bare ref id or a global ``<session_id>/<ref_id>`` whose
                session matches this context.

        Raises:
            RuntimeError: If called on a directory-scan Context.
            RefNotFoundError: If ``ref_id`` is not present.
            ValueError: If a global ref belongs to a different session.
        """
        self._require_pyctx("remove")
        bare = _strip_session_prefix(ref_id, self._session_id)
        self._pyctx.remove(bare)

    def items(self) -> list[dict[str, Any]]:
        """Return all items as dicts (insertion order).

        Each dict has keys ``ref_id``, ``role``, ``kind``, ``source_kind``,
        ``source_value``, ``byte_len``, ``desc``, and ``metadata``.
        """
        self._require_pyctx("items")
        return list(self._pyctx.items())

    def ref_ids(self) -> list[Ref]:
        """All ref ids in insertion order."""
        self._require_pyctx("ref_ids")
        return list(self._pyctx.ref_ids())

    def to_messages(
        self,
        format: _FormatLiteral = "openai",
        *,
        encoders: dict[str, str] | None = None,
        encoder_kwargs: dict[str, dict[str, Any]] | None = None,
    ) -> list[dict[str, Any]]:
        """Encode every item into a role-aware VLM message list.

        Args:
            format: Message shape. ``"openai"`` returns one
                ``chat.completions``-style message per consecutive role
                run. ``"gemini"`` adapts image parts to the
                ``inline_data`` Part shape and folds non-user roles into
                labelled text parts.
            encoders: Per-kind encoder overrides, e.g. ``{"image": "tile",
                "video": "mosaic"}``. Unspecified kinds fall back to
                sensible defaults (``resize``, ``mosaic``, ``rasterize``, ``base64``).
            encoder_kwargs: Per-kind keyword arguments forwarded to the
                encoder's ``encode()`` method, e.g.
                ``{"document": {"pages_per_message": 8}}``.

        Returns:
            Message dicts for ``client.chat.completions.create`` (OpenAI)
            or the Gemini ``generate_content`` call.

        Raises:
            ValueError: If ``format`` is not ``"openai"`` or ``"gemini"``.
            RuntimeError: If called on a directory-scan Context.
        """
        if format not in ("openai", "gemini"):
            raise ValueError(f"format must be 'openai' or 'gemini', got {format!r}")
        self._require_pyctx("to_messages")

        from mm.refs_messages import build_messages

        return build_messages(
            self, format=format, encoders=encoders or {}, encoder_kwargs=encoder_kwargs or {}
        )

    def to_md(self, mode: Literal["metadata", "fast", "accurate"] = "metadata") -> str:
        """Render a markdown table of every ref + source + content.

        Args:
            mode: ``"metadata"`` populates each row with the metadata-tier content
                (``files.text_preview`` produced by ``extract_meta``; no LLM call).
                ``"fast"`` runs light LLM extraction (requires a
                configured profile; not wired yet — currently raises
                ``NotImplementedError``).
                ``"accurate"`` runs the LLM-backed path (requires a
                configured profile; not wired yet — currently raises
                ``NotImplementedError``).

        Returns:
            Markdown table (headers: ref | kind | source | content).
        """
        self._require_pyctx("to_md")
        if mode in ("fast", "accurate"):
            raise NotImplementedError(
                f"to_md(mode={mode!r}) is not implemented yet. "
                "Use mode='metadata' for the metadata-tier content."
            )
        contents = self._collect_metadata_contents()
        return self._pyctx.to_md_table(contents)

    def print_tree(self, layout: _TreeLayout = "insertion") -> None:
        """Print a rich.Tree view of the context.

        Args:
            layout: Visual layout. Default ``"insertion"``.

                - ``"insertion"``: insertion-order, metadata rendered beneath
                  each item (T4). [implemented]
                - ``"paths"``: directory hierarchy with refs on the right
                  (T1). [TODO]
                - ``"kind"``: grouped by kind: images / documents / videos /
                  … (T2). [TODO]
                - ``"flat"``: ref-first flat list (T3). [TODO — likely
                  better as ``ctx.print_table()``]
                - ``"hybrid"``: paths + per-item dim metadata line (T5).
                  [TODO]

        Raises:
            NotImplementedError: For any non-``"insertion"`` layout.
        """
        self._require_pyctx("print_tree")
        if layout != "insertion":
            raise NotImplementedError(
                f"print_tree(layout={layout!r}) is not implemented yet. "
                "Only layout='insertion' is available in this release."
            )
        from mm.display import output_console

        output_console.print(self._pyctx.render_tree_insertion())

    # ── Unified get (instance + classmethod hybrid) ───────────────────

    get = _HybridGet()  # type: ignore[assignment]  # descriptor assigned below

    def _get_instance(self, ref_id: str) -> Any:
        """Instance ``ctx.get(ref)`` — local lookup.

        - Incremental mode: returns the ``str``, ``Path``, or
          ``PIL.Image.Image`` stored at ``ref_id``.
        - Directory-scan mode: returns the DB row for the global ref.

        Raises:
            RefNotFoundError: Bare ref id not found in the context.
            ValueError: Malformed global ref or mismatched session id.
        """
        bare = _strip_session_prefix(ref_id, self._session_id)
        if self._pyctx is not None:
            return self._pyctx.get(bare)
        return Context._classmethod_get(
            f"{self._session_id}/{bare}" if self._session_id else bare,
            default_session=self._session_id,
            db=self._db,
        )

    @classmethod
    def _classmethod_get(
        cls,
        ref: str,
        *,
        default_session: str | None = None,
        db: MmDatabase | None = None,
    ) -> dict[str, Any] | None:
        """Cross-session DB resolver for persisted contexts."""
        from mm.refs import GlobalRef

        if "/" in ref:
            parsed = GlobalRef.parse(ref)
            sid, rid = parsed.session_id, parsed.ref_id
        elif default_session is not None:
            sid, rid = default_session, ref
        else:
            raise ValueError(
                f"ambiguous ref {ref!r}: pass a global '<session_id>/<ref_id>' "
                "or supply session_id=..."
            )
        if db is None:
            db = MmDatabase()
        return db.get_file_by_ref(sid, rid)

    @staticmethod
    def resolve(
        global_ref: str,
        *,
        db: MmDatabase | None = None,
    ) -> dict[str, Any] | None:
        """Legacy DB resolver kept for backward compatibility.

        New code should prefer :meth:`get` (instance for local lookup,
        classmethod for cross-session DB lookup).
        """
        return Context._classmethod_get(global_ref, db=db)

    # ── Persistence (deferred for role-aware) ─────────────────────────

    def save(self) -> None:
        """Persist the context.

        For directory-scan contexts, writes the Arrow table to the
        global mm DB (existing behaviour).

        For incremental role-aware contexts, this is **not implemented
        yet**. Planned behaviour:

            - Write ``(session_id, ref_id, kind, uri, content_hash, metadata)``
              to the ``files`` table in ``~/.local/share/mm/mm.db``.
            - For in-memory objects, spool to a content-addressed cache
              dir (``~/.local/share/mm/blobs/<xxh3>.<ext>``) and record
              the blob URI.
            - Make ``Context.get("<session>/<ref>")`` resolve via the DB
              across processes.
            - Idempotent on repeat calls for the same
              ``(session_id, ref_id)``.

        Raises:
            NotImplementedError: For incremental role-aware contexts.
        """
        if self._pyctx is not None:
            raise NotImplementedError(
                "Context.save() is not implemented for incremental role-aware contexts yet. "
                "See Context.save docstring for the planned behaviour."
            )
        refs = self._materialize_refs() if self._session_id else None
        assert self.root is not None
        self.db.upsert_files(
            self._table,
            self.root,
            session_id=self._session_id,
            refs=refs,
            scanner=self._scanner,
        )

    # ── Directory-scan API (preserved) ────────────────────────────────

    @property
    def files(self) -> list[FileEntry]:
        """List all files as :class:`FileEntry` (directory-scan mode)."""
        if self._pyctx is not None:
            raise RuntimeError(
                "Context.files is only available on directory-scan contexts. "
                "Use ctx.items() for incremental role-aware contexts."
            )
        rows = self._table.to_pydict()
        result = []
        for i in range(self._table.num_rows):
            row = {col: rows[col][i] for col in rows}
            result.append(FileEntry(row, context=self))
        return result

    def _table_with_refs(self):
        """Return the Arrow table with ``session_id`` + ``ref_id`` appended."""
        import pyarrow as pa

        table = self._table
        if self._session_id is None:
            return table
        ref_map = self._materialize_refs()
        paths = table.column("path").to_pylist()
        ref_ids = [ref_map.get(p) for p in paths]
        sess_col = pa.array([self._session_id] * len(paths), type=pa.string())
        ref_col = pa.array(ref_ids, type=pa.string())
        return table.append_column("session_id", sess_col).append_column("ref_id", ref_col)

    def to_polars(self, *, refs: bool = False):
        """Convert to Polars DataFrame (directory-scan mode)."""
        self._require_table("to_polars")
        from mm.df import arrow_to_polars

        table = self._table_with_refs() if refs else self._table
        return arrow_to_polars(table)

    def to_pandas(self, *, refs: bool = False):
        """Convert to Pandas DataFrame (directory-scan mode)."""
        self._require_table("to_pandas")
        from mm.df import arrow_to_pandas

        table = self._table_with_refs() if refs else self._table
        return arrow_to_pandas(table)

    def to_arrow(self, *, refs: bool = False):
        """Return the underlying PyArrow Table (directory-scan mode)."""
        self._require_table("to_arrow")
        return self._table_with_refs() if refs else self._table

    def sql(self, query: str):
        """Run a SQL query against the directory-scan Arrow table."""
        self._require_table("sql")
        from mm.query import query_arrow_table

        return query_arrow_table(self._table, query)

    def filter(
        self,
        *,
        kind: str | None = None,
        ext: str | list[str] | None = None,
        min_size: str | int | None = None,
        max_size: str | int | None = None,
        modified_after: str | None = None,
    ) -> Context:
        """Return a new directory-scan Context with filtered rows."""
        self._require_table("filter")
        conditions: list[str] = []

        if kind:
            if "," in kind:
                kinds = ", ".join(f"'{k.strip()}'" for k in kind.split(","))
                conditions.append(f"kind IN ({kinds})")
            else:
                conditions.append(f"kind = '{kind}'")
        if ext:
            if isinstance(ext, str):
                ext = [e.strip() for e in ext.split(",")]
            ext_list = ", ".join(f"'{e}'" for e in ext)
            conditions.append(f"ext IN ({ext_list})")
        if min_size is not None:
            size_bytes = _parse_size(min_size) if isinstance(min_size, str) else min_size
            conditions.append(f"size >= {size_bytes}")
        if max_size is not None:
            size_bytes = _parse_size(max_size) if isinstance(max_size, str) else max_size
            conditions.append(f"size <= {size_bytes}")

        if not conditions:
            return self

        from mm.query import query_arrow_table

        where_clause = " AND ".join(conditions)
        filtered = query_arrow_table(self._table, f"SELECT * FROM files WHERE {where_clause}")

        new_ctx = object.__new__(Context)
        new_ctx.root = self.root
        new_ctx._llm_base_url = self._llm_base_url
        new_ctx._llm_api_key = self._llm_api_key
        new_ctx._scanner = self._scanner
        new_ctx._table = filtered
        new_ctx._db = self._db
        new_ctx._no_ignore = self._no_ignore
        new_ctx._session_id = self._session_id
        new_ctx._refs_cache = None

        new_ctx._pyctx = None
        return new_ctx

    def cat(self, path: str, *, no_cache: bool = False) -> str:
        """Read locally-extracted (metadata-tier) content of a file (directory-scan mode)."""
        self._require_table("cat")
        assert self.root is not None
        full_path = self.root / path
        from mm.cat_utils.extract_meta import extract_meta
        from mm.utils import file_kind

        kind = file_kind(full_path)
        if kind == "text":
            return full_path.read_text(errors="replace")
        return extract_meta(full_path, kind, no_cache=no_cache)

    def head(self, path: str, *, n: int = 10) -> str:
        content = self.cat(path)
        return "\n".join(content.splitlines()[:n])

    def tail(self, path: str, *, n: int = 10) -> str:
        content = self.cat(path)
        return "\n".join(content.splitlines()[-n:])

    def encode(
        self,
        path: str,
        *,
        strategy: str | None = None,
        **kwargs: Any,
    ) -> list[dict[str, Any]]:
        """Encode a file into VLM-ready Message dicts (directory-scan).

        Args:
            path: Relative path within the context root.
            strategy: Registered encoder name (e.g. ``"resize"``).
                If ``None``, defaults to ``resize`` for images,
                ``mosaic`` for video, ``rasterize`` for documents,
                and ``base64`` for audio.
            **kwargs: Forwarded to ``encoder.encode()``.

        Returns:
            List of OpenAI-compatible Message dicts.
        """
        self._require_table("encode")
        from mm.encoders import get as get_encoder
        from mm.refs_messages import OPENAI_DEFAULT_ENCODERS
        from mm.utils import file_kind

        assert self.root is not None
        full_path = self.root / path
        if not full_path.exists():
            raise FileNotFoundError(f"{path} not found in {self.root}")

        kind = file_kind(full_path.name)
        if strategy is None:
            strategy = OPENAI_DEFAULT_ENCODERS.get(kind, "resize")

        strat = get_encoder(strategy, kind)
        return list(strat.encode(full_path, **kwargs))

    def grep(self, pattern: str, *, kind: str | None = None) -> list[dict[str, Any]]:
        """Search for a pattern across scanned files."""
        self._require_table("grep")
        import re

        matches: list[dict[str, Any]] = []
        target = self.filter(kind=kind) if kind else self

        for f in target.files:
            try:
                content = self.cat(f.path)
                for i, line in enumerate(content.splitlines(), 1):
                    if re.search(pattern, line):
                        matches.append({"path": f.path, "line_number": i, "line": line})
            except Exception:
                continue
        return matches

    def show(
        self,
        *,
        limit: int | None = 50,
        columns: list[str] | None = None,
        refs: bool = False,
    ) -> None:
        """Display the index as a Rich table (directory-scan mode)."""
        self._require_table("show")
        from mm.display import arrow_table_to_rich, output_console

        table = self._table_with_refs() if refs else self._table
        if refs and columns is None:
            columns = [c for c in table.column_names if c != "session_id"]
        rich_table = arrow_table_to_rich(table, columns=columns, limit=limit)
        output_console.print(rich_table)

    def info(self) -> None:
        """Display summary statistics as a Rich panel (directory-scan mode)."""
        self._require_table("info")
        import collections

        from mm.display import format_size, info_panel, output_console

        total_size = sum(r.as_py() for r in self._table.column("size"))
        kinds = collections.Counter(r.as_py() for r in self._table.column("kind"))
        exts = collections.Counter(r.as_py() for r in self._table.column("ext"))
        top_exts = exts.most_common(5)

        stats: dict[str, Any] = {
            "Files": self.num_files,
            "Total Size": format_size(total_size),
            "Root": str(self.root),
        }
        for k, count in sorted(kinds.items(), key=lambda x: -x[1]):
            stats[k.capitalize()] = count

        ext_str = ", ".join(f"{e} ({c})" for e, c in top_exts)
        stats["Top Extensions"] = ext_str

        assert self.root is not None
        panel = info_panel(stats, title=self.root.name)
        output_console.print(panel)

    # ── Legacy ref helpers (directory-scan) ──────────────────────────

    def _uri_for(self, path: str) -> str:
        assert self.root is not None
        return f"{self.root}/{path.lstrip('/')}"

    def _require_session(self) -> str:
        if self._session_id is None:
            raise ValueError(
                "Context has no session_id; pass session_id=... to Context() "
                "or use Context.new_session() to enable refs."
            )
        return self._session_id

    def _materialize_refs(self) -> dict[str, str]:
        """Build-or-return cached ``{path: ref_id}`` (directory-scan mode)."""
        if self._refs_cache is not None:
            return self._refs_cache
        session_id = self._require_session()
        from mm.refs import make_ref_id

        paths = self._table.column("path").to_pylist()
        kinds = self._table.column("kind").to_pylist()

        existing: dict[str, str] = {}
        if self._db is not None or MmDatabase.DB_PATH.exists():
            try:
                rows = self.db.list_session_files(session_id)
                assert self.root is not None
                root_s = f"{self.root}/"
                for r in rows:
                    uri = str(r.get("uri") or "")
                    ref = r.get("ref_id")
                    if ref and uri.startswith(root_s):
                        existing[uri[len(root_s) :]] = str(ref)
            except Exception:
                existing = {}

        out: dict[str, str] = {}
        for p, k in zip(paths, kinds):
            out[p] = existing.get(p) or make_ref_id(k or "other")
        self._refs_cache = out
        return out

    def ref_for(self, path: str) -> str:
        """Return the ref id for a scanned ``path`` (directory-scan mode)."""
        self._require_table("ref_for")
        refs = self._materialize_refs()
        if path not in refs:
            raise ValueError(f"{path!r} is not in this context")
        return refs[path]

    def global_ref(self, path: str) -> str:
        """Return ``<session_id>/<ref_id>`` for ``path`` (directory-scan)."""
        self._require_table("global_ref")
        return f"{self._require_session()}/{self.ref_for(path)}"

    @property
    def refs(self) -> dict[str, str]:
        """Mapping of ``path -> global_ref`` for every file (scan mode).

        For role-aware contexts, returns ``{ref_id: <session>/<ref>}`` for
        every stored item.
        """
        if self._pyctx is not None:
            return {r: f"{self._session_id}/{r}" for r in self._pyctx.ref_ids()}
        if self._session_id is None:
            return {}
        local = self._materialize_refs()
        return {p: f"{self._session_id}/{r}" for p, r in local.items()}

    # ── Internals ────────────────────────────────────────────────────

    def _collect_metadata_contents(self) -> dict[str, str]:
        """Extract ``cat``-like content for every role-aware item - mode=metadata"""
        from mm.cat_utils.extract_meta import extract_meta
        from mm.utils import file_kind

        out: dict[str, str] = {}
        for item in self._pyctx.items():
            ref_id = item["ref_id"]
            src_kind = item["source_kind"]
            src = item["source_value"]
            if src_kind == "in_memory" and item["kind"] == "text":
                out[ref_id] = item.get("desc") or ""
                continue
            if src_kind == "path":
                p = Path(src)
                if not p.exists():
                    continue
                kind = file_kind(p)
                try:
                    out[ref_id] = extract_meta(p, kind)
                except Exception as exc:  # noqa: BLE001
                    out[ref_id] = f"[extract failed: {exc}]"
            # in-memory / url items fall through to the metadata fallback
            # handled by the Rust-side to_md_with_contents.
        return out

    def _require_pyctx(self, method: str) -> None:
        if self._pyctx is None:
            raise RuntimeError(
                f"Context.{method}() requires an incremental role-aware context. "
                "Construct with Context() / Context(session_id=...) (no root)."
            )

    def _require_table(self, method: str) -> None:
        if self._table is None:
            raise RuntimeError(
                f"Context.{method}() requires a directory-scan context. "
                "Construct with Context(root=...)."
            )

    # ── Reprs ────────────────────────────────────────────────────────

    def __repr__(self) -> str:
        if self._pyctx is not None:
            return self._pyctx.repr_markdown()
        sess = f", session='{self._session_id}'" if self._session_id else ""
        return f"Context(root='{self.root}', files={self.num_files}{sess})"

    def render_html(
        self,
        *,
        max_image_width: int = 320,
        title: str | None = None,
        encoders: dict[str, str] | None = None,
        encoder_kwargs: dict[str, dict[str, Any]] | None = None,
    ) -> str:
        """Render the context as rich, self-contained HTML.

        Each item is rendered with its native media view (image, video
        player, audio player, document pages), user-supplied metadata
        dict, and a collapsible section showing the encoded VLM
        representation. Suitable for ``IPython.display.HTML()`` or
        direct embedding.

        Args:
            max_image_width: Maximum rendered image width in pixels.
            title: Optional title bar text. Defaults to an auto-generated
                summary.
            encoders: Per-kind encoder overrides, e.g.
                ``{"image": "tile", "video": "mosaic"}``.
            encoder_kwargs: Per-kind kwargs forwarded to the encoder's
                ``encode()`` method.

        Returns:
            Self-contained HTML string.

        Raises:
            RuntimeError: If called on a directory-scan Context.
        """
        self._require_pyctx("render_html")
        from mm.notebook import render_context

        return render_context(
            self,
            max_image_width=max_image_width,
            title=title,
            encoders=encoders,
            encoder_kwargs=encoder_kwargs,
        )

    def __enter__(self) -> "Context":
        return self

    def __exit__(self, *_: Any) -> None:
        pass

    def __len__(self) -> int:
        return self.num_files

db property

Lazy-initialized global database connection.

files property

List all files as :class:FileEntry (directory-scan mode).

refs property

Mapping of path -> global_ref for every file (scan mode).

For role-aware contexts, returns {ref_id: <session>/<ref>} for every stored item.

session_id property

External session id attached to this context, if any.

add(obj, *, role='user', metadata=None)

Attach an item to the context and return its kind-prefixed ref id.

Accepts
  • str — free-form text, inlined under any supported role.
  • :class:pathlib.Path pointing to an existing file (role="user" only).
  • :class:PIL.Image.Image — held in-memory (role="user" only).

Parameters:

Name Type Description Default
obj str | Path | 'PILImage.Image'

Free-form text, a pathlib.Path, or PIL.Image.Image.

required
role _RoleLiteral

Chat role for this item: "system", "developer", or "user".

'user'
metadata dict[str, Any] | None

Optional JSON-serialisable dict of extra context for this item (e.g. {"note": "hero shot"}, {"summary": "Attention is all you need", "tags": ["nlp"]}). Stable key order is preserved. The metadata flows into :meth:__repr__, :meth:to_md, :meth:print_tree, and :meth:to_messages (emitted as a leading text block per item so the VLM sees it inline).

None

Returns:

Name Type Description
Ref

The generated ref id (<prefix>_<6 hex>). The return type

is Ref

data:mm.refs.Ref — a typed alias over str.

Raises:

Type Description
RuntimeError

If called on a directory-scan Context.

ValueError

If role is invalid or a media object is added to a non-user role.

FileNotFoundError

If obj is a Path that doesn't exist.

TypeError

If obj is not a str, Path, or PIL.Image.Image.

Source code in mm/context.py
def add(
    self,
    obj: str | Path | "PILImage.Image",
    *,
    role: _RoleLiteral = "user",
    metadata: dict[str, Any] | None = None,
) -> Ref:
    """Attach an item to the context and return its kind-prefixed ref id.

    Accepts:
        - ``str`` — free-form text, inlined under any supported role.
        - :class:`pathlib.Path` pointing to an existing file (``role="user"`` only).
        - :class:`PIL.Image.Image` — held in-memory (``role="user"`` only).

    Args:
        obj: Free-form text, a ``pathlib.Path``, or ``PIL.Image.Image``.
        role: Chat role for this item: ``"system"``, ``"developer"``, or ``"user"``.
        metadata: Optional JSON-serialisable ``dict`` of extra
            context for this item (e.g. ``{"note": "hero shot"}``,
            ``{"summary": "Attention is all you need", "tags":
            ["nlp"]}``). Stable key order is preserved. The
            metadata flows into :meth:`__repr__`, :meth:`to_md`,
            :meth:`print_tree`, and :meth:`to_messages` (emitted as
            a leading text block per item so the VLM sees it
            inline).

    Returns:
        The generated ref id (``<prefix>_<6 hex>``). The return type
        is :data:`mm.refs.Ref` — a typed alias over ``str``.

    Raises:
        RuntimeError: If called on a directory-scan Context.
        ValueError: If ``role`` is invalid or a media object is added
            to a non-user role.
        FileNotFoundError: If ``obj`` is a Path that doesn't exist.
        TypeError: If ``obj`` is not a str, Path, or PIL.Image.Image.
    """
    self._require_pyctx("add")
    role = _validate_role(role)
    kind, source_kind, source_value, byte_len, desc, py_obj = _classify_add_obj(obj)
    if role != "user" and kind != "text":
        raise ValueError("Only free-form string text can use role='system' or role='developer'")
    metadata_json = json.dumps(metadata) if metadata else None

    ref_id: str = self._pyctx.add(
        role,
        kind,
        source_kind,
        source_value,
        byte_len=byte_len,
        desc=desc,
        py_obj=py_obj,
        metadata_json=metadata_json,
    )
    return ref_id

cat(path, *, no_cache=False)

Read locally-extracted (metadata-tier) content of a file (directory-scan mode).

Source code in mm/context.py
def cat(self, path: str, *, no_cache: bool = False) -> str:
    """Read locally-extracted (metadata-tier) content of a file (directory-scan mode)."""
    self._require_table("cat")
    assert self.root is not None
    full_path = self.root / path
    from mm.cat_utils.extract_meta import extract_meta
    from mm.utils import file_kind

    kind = file_kind(full_path)
    if kind == "text":
        return full_path.read_text(errors="replace")
    return extract_meta(full_path, kind, no_cache=no_cache)

encode(path, *, strategy=None, **kwargs)

Encode a file into VLM-ready Message dicts (directory-scan).

Parameters:

Name Type Description Default
path str

Relative path within the context root.

required
strategy str | None

Registered encoder name (e.g. "resize"). If None, defaults to resize for images, mosaic for video, rasterize for documents, and base64 for audio.

None
**kwargs Any

Forwarded to encoder.encode().

{}

Returns:

Type Description
list[dict[str, Any]]

List of OpenAI-compatible Message dicts.

Source code in mm/context.py
def encode(
    self,
    path: str,
    *,
    strategy: str | None = None,
    **kwargs: Any,
) -> list[dict[str, Any]]:
    """Encode a file into VLM-ready Message dicts (directory-scan).

    Args:
        path: Relative path within the context root.
        strategy: Registered encoder name (e.g. ``"resize"``).
            If ``None``, defaults to ``resize`` for images,
            ``mosaic`` for video, ``rasterize`` for documents,
            and ``base64`` for audio.
        **kwargs: Forwarded to ``encoder.encode()``.

    Returns:
        List of OpenAI-compatible Message dicts.
    """
    self._require_table("encode")
    from mm.encoders import get as get_encoder
    from mm.refs_messages import OPENAI_DEFAULT_ENCODERS
    from mm.utils import file_kind

    assert self.root is not None
    full_path = self.root / path
    if not full_path.exists():
        raise FileNotFoundError(f"{path} not found in {self.root}")

    kind = file_kind(full_path.name)
    if strategy is None:
        strategy = OPENAI_DEFAULT_ENCODERS.get(kind, "resize")

    strat = get_encoder(strategy, kind)
    return list(strat.encode(full_path, **kwargs))

filter(*, kind=None, ext=None, min_size=None, max_size=None, modified_after=None)

Return a new directory-scan Context with filtered rows.

Source code in mm/context.py
def filter(
    self,
    *,
    kind: str | None = None,
    ext: str | list[str] | None = None,
    min_size: str | int | None = None,
    max_size: str | int | None = None,
    modified_after: str | None = None,
) -> Context:
    """Return a new directory-scan Context with filtered rows."""
    self._require_table("filter")
    conditions: list[str] = []

    if kind:
        if "," in kind:
            kinds = ", ".join(f"'{k.strip()}'" for k in kind.split(","))
            conditions.append(f"kind IN ({kinds})")
        else:
            conditions.append(f"kind = '{kind}'")
    if ext:
        if isinstance(ext, str):
            ext = [e.strip() for e in ext.split(",")]
        ext_list = ", ".join(f"'{e}'" for e in ext)
        conditions.append(f"ext IN ({ext_list})")
    if min_size is not None:
        size_bytes = _parse_size(min_size) if isinstance(min_size, str) else min_size
        conditions.append(f"size >= {size_bytes}")
    if max_size is not None:
        size_bytes = _parse_size(max_size) if isinstance(max_size, str) else max_size
        conditions.append(f"size <= {size_bytes}")

    if not conditions:
        return self

    from mm.query import query_arrow_table

    where_clause = " AND ".join(conditions)
    filtered = query_arrow_table(self._table, f"SELECT * FROM files WHERE {where_clause}")

    new_ctx = object.__new__(Context)
    new_ctx.root = self.root
    new_ctx._llm_base_url = self._llm_base_url
    new_ctx._llm_api_key = self._llm_api_key
    new_ctx._scanner = self._scanner
    new_ctx._table = filtered
    new_ctx._db = self._db
    new_ctx._no_ignore = self._no_ignore
    new_ctx._session_id = self._session_id
    new_ctx._refs_cache = None

    new_ctx._pyctx = None
    return new_ctx

global_ref(path)

Return <session_id>/<ref_id> for path (directory-scan).

Source code in mm/context.py
def global_ref(self, path: str) -> str:
    """Return ``<session_id>/<ref_id>`` for ``path`` (directory-scan)."""
    self._require_table("global_ref")
    return f"{self._require_session()}/{self.ref_for(path)}"

grep(pattern, *, kind=None)

Search for a pattern across scanned files.

Source code in mm/context.py
def grep(self, pattern: str, *, kind: str | None = None) -> list[dict[str, Any]]:
    """Search for a pattern across scanned files."""
    self._require_table("grep")
    import re

    matches: list[dict[str, Any]] = []
    target = self.filter(kind=kind) if kind else self

    for f in target.files:
        try:
            content = self.cat(f.path)
            for i, line in enumerate(content.splitlines(), 1):
                if re.search(pattern, line):
                    matches.append({"path": f.path, "line_number": i, "line": line})
        except Exception:
            continue
    return matches

info()

Display summary statistics as a Rich panel (directory-scan mode).

Source code in mm/context.py
def info(self) -> None:
    """Display summary statistics as a Rich panel (directory-scan mode)."""
    self._require_table("info")
    import collections

    from mm.display import format_size, info_panel, output_console

    total_size = sum(r.as_py() for r in self._table.column("size"))
    kinds = collections.Counter(r.as_py() for r in self._table.column("kind"))
    exts = collections.Counter(r.as_py() for r in self._table.column("ext"))
    top_exts = exts.most_common(5)

    stats: dict[str, Any] = {
        "Files": self.num_files,
        "Total Size": format_size(total_size),
        "Root": str(self.root),
    }
    for k, count in sorted(kinds.items(), key=lambda x: -x[1]):
        stats[k.capitalize()] = count

    ext_str = ", ".join(f"{e} ({c})" for e, c in top_exts)
    stats["Top Extensions"] = ext_str

    assert self.root is not None
    panel = info_panel(stats, title=self.root.name)
    output_console.print(panel)

items()

Return all items as dicts (insertion order).

Each dict has keys ref_id, role, kind, source_kind, source_value, byte_len, desc, and metadata.

Source code in mm/context.py
def items(self) -> list[dict[str, Any]]:
    """Return all items as dicts (insertion order).

    Each dict has keys ``ref_id``, ``role``, ``kind``, ``source_kind``,
    ``source_value``, ``byte_len``, ``desc``, and ``metadata``.
    """
    self._require_pyctx("items")
    return list(self._pyctx.items())

new_session(root=None, **kwargs) classmethod

Create a Context with a freshly minted session id.

Convenience for callers who want refs but don't have a session id from an upstream system. Legacy signature uses UUIDv4 for compatibility; new incremental usage uses UUIDv7 via :func:uuid7.

Source code in mm/context.py
@classmethod
def new_session(
    cls,
    root: str | Path | None = None,
    **kwargs: Any,
) -> Context:
    """Create a Context with a freshly minted session id.

    Convenience for callers who want refs but don't have a session
    id from an upstream system. Legacy signature uses UUIDv4 for
    compatibility; new incremental usage uses UUIDv7 via :func:`uuid7`.
    """
    if root is None:
        return cls(session_id=uuid7(), **kwargs)
    return cls(root, session_id=new_session_id(), **kwargs)

print_tree(layout='insertion')

Print a rich.Tree view of the context.

Parameters:

Name Type Description Default
layout _TreeLayout

Visual layout. Default "insertion".

  • "insertion": insertion-order, metadata rendered beneath each item (T4). [implemented]
  • "paths": directory hierarchy with refs on the right (T1). [TODO]
  • "kind": grouped by kind: images / documents / videos / … (T2). [TODO]
  • "flat": ref-first flat list (T3). [TODO — likely better as ctx.print_table()]
  • "hybrid": paths + per-item dim metadata line (T5). [TODO]
'insertion'

Raises:

Type Description
NotImplementedError

For any non-"insertion" layout.

Source code in mm/context.py
def print_tree(self, layout: _TreeLayout = "insertion") -> None:
    """Print a rich.Tree view of the context.

    Args:
        layout: Visual layout. Default ``"insertion"``.

            - ``"insertion"``: insertion-order, metadata rendered beneath
              each item (T4). [implemented]
            - ``"paths"``: directory hierarchy with refs on the right
              (T1). [TODO]
            - ``"kind"``: grouped by kind: images / documents / videos /
              … (T2). [TODO]
            - ``"flat"``: ref-first flat list (T3). [TODO — likely
              better as ``ctx.print_table()``]
            - ``"hybrid"``: paths + per-item dim metadata line (T5).
              [TODO]

    Raises:
        NotImplementedError: For any non-``"insertion"`` layout.
    """
    self._require_pyctx("print_tree")
    if layout != "insertion":
        raise NotImplementedError(
            f"print_tree(layout={layout!r}) is not implemented yet. "
            "Only layout='insertion' is available in this release."
        )
    from mm.display import output_console

    output_console.print(self._pyctx.render_tree_insertion())

ref_for(path)

Return the ref id for a scanned path (directory-scan mode).

Source code in mm/context.py
def ref_for(self, path: str) -> str:
    """Return the ref id for a scanned ``path`` (directory-scan mode)."""
    self._require_table("ref_for")
    refs = self._materialize_refs()
    if path not in refs:
        raise ValueError(f"{path!r} is not in this context")
    return refs[path]

ref_ids()

All ref ids in insertion order.

Source code in mm/context.py
def ref_ids(self) -> list[Ref]:
    """All ref ids in insertion order."""
    self._require_pyctx("ref_ids")
    return list(self._pyctx.ref_ids())

remove(ref_id)

Remove an item from an incremental context by ref id.

Parameters:

Name Type Description Default
ref_id str

Bare ref id or a global <session_id>/<ref_id> whose session matches this context.

required

Raises:

Type Description
RuntimeError

If called on a directory-scan Context.

RefNotFoundError

If ref_id is not present.

ValueError

If a global ref belongs to a different session.

Source code in mm/context.py
def remove(self, ref_id: str) -> None:
    """Remove an item from an incremental context by ref id.

    Args:
        ref_id: Bare ref id or a global ``<session_id>/<ref_id>`` whose
            session matches this context.

    Raises:
        RuntimeError: If called on a directory-scan Context.
        RefNotFoundError: If ``ref_id`` is not present.
        ValueError: If a global ref belongs to a different session.
    """
    self._require_pyctx("remove")
    bare = _strip_session_prefix(ref_id, self._session_id)
    self._pyctx.remove(bare)

render_html(*, max_image_width=320, title=None, encoders=None, encoder_kwargs=None)

Render the context as rich, self-contained HTML.

Each item is rendered with its native media view (image, video player, audio player, document pages), user-supplied metadata dict, and a collapsible section showing the encoded VLM representation. Suitable for IPython.display.HTML() or direct embedding.

Parameters:

Name Type Description Default
max_image_width int

Maximum rendered image width in pixels.

320
title str | None

Optional title bar text. Defaults to an auto-generated summary.

None
encoders dict[str, str] | None

Per-kind encoder overrides, e.g. {"image": "tile", "video": "mosaic"}.

None
encoder_kwargs dict[str, dict[str, Any]] | None

Per-kind kwargs forwarded to the encoder's encode() method.

None

Returns:

Type Description
str

Self-contained HTML string.

Raises:

Type Description
RuntimeError

If called on a directory-scan Context.

Source code in mm/context.py
def render_html(
    self,
    *,
    max_image_width: int = 320,
    title: str | None = None,
    encoders: dict[str, str] | None = None,
    encoder_kwargs: dict[str, dict[str, Any]] | None = None,
) -> str:
    """Render the context as rich, self-contained HTML.

    Each item is rendered with its native media view (image, video
    player, audio player, document pages), user-supplied metadata
    dict, and a collapsible section showing the encoded VLM
    representation. Suitable for ``IPython.display.HTML()`` or
    direct embedding.

    Args:
        max_image_width: Maximum rendered image width in pixels.
        title: Optional title bar text. Defaults to an auto-generated
            summary.
        encoders: Per-kind encoder overrides, e.g.
            ``{"image": "tile", "video": "mosaic"}``.
        encoder_kwargs: Per-kind kwargs forwarded to the encoder's
            ``encode()`` method.

    Returns:
        Self-contained HTML string.

    Raises:
        RuntimeError: If called on a directory-scan Context.
    """
    self._require_pyctx("render_html")
    from mm.notebook import render_context

    return render_context(
        self,
        max_image_width=max_image_width,
        title=title,
        encoders=encoders,
        encoder_kwargs=encoder_kwargs,
    )

resolve(global_ref, *, db=None) staticmethod

Legacy DB resolver kept for backward compatibility.

New code should prefer :meth:get (instance for local lookup, classmethod for cross-session DB lookup).

Source code in mm/context.py
@staticmethod
def resolve(
    global_ref: str,
    *,
    db: MmDatabase | None = None,
) -> dict[str, Any] | None:
    """Legacy DB resolver kept for backward compatibility.

    New code should prefer :meth:`get` (instance for local lookup,
    classmethod for cross-session DB lookup).
    """
    return Context._classmethod_get(global_ref, db=db)

save()

Persist the context.

For directory-scan contexts, writes the Arrow table to the global mm DB (existing behaviour).

For incremental role-aware contexts, this is not implemented yet. Planned behaviour:

- Write ``(session_id, ref_id, kind, uri, content_hash, metadata)``
  to the ``files`` table in ``~/.local/share/mm/mm.db``.
- For in-memory objects, spool to a content-addressed cache
  dir (``~/.local/share/mm/blobs/<xxh3>.<ext>``) and record
  the blob URI.
- Make ``Context.get("<session>/<ref>")`` resolve via the DB
  across processes.
- Idempotent on repeat calls for the same
  ``(session_id, ref_id)``.

Raises:

Type Description
NotImplementedError

For incremental role-aware contexts.

Source code in mm/context.py
def save(self) -> None:
    """Persist the context.

    For directory-scan contexts, writes the Arrow table to the
    global mm DB (existing behaviour).

    For incremental role-aware contexts, this is **not implemented
    yet**. Planned behaviour:

        - Write ``(session_id, ref_id, kind, uri, content_hash, metadata)``
          to the ``files`` table in ``~/.local/share/mm/mm.db``.
        - For in-memory objects, spool to a content-addressed cache
          dir (``~/.local/share/mm/blobs/<xxh3>.<ext>``) and record
          the blob URI.
        - Make ``Context.get("<session>/<ref>")`` resolve via the DB
          across processes.
        - Idempotent on repeat calls for the same
          ``(session_id, ref_id)``.

    Raises:
        NotImplementedError: For incremental role-aware contexts.
    """
    if self._pyctx is not None:
        raise NotImplementedError(
            "Context.save() is not implemented for incremental role-aware contexts yet. "
            "See Context.save docstring for the planned behaviour."
        )
    refs = self._materialize_refs() if self._session_id else None
    assert self.root is not None
    self.db.upsert_files(
        self._table,
        self.root,
        session_id=self._session_id,
        refs=refs,
        scanner=self._scanner,
    )

show(*, limit=50, columns=None, refs=False)

Display the index as a Rich table (directory-scan mode).

Source code in mm/context.py
def show(
    self,
    *,
    limit: int | None = 50,
    columns: list[str] | None = None,
    refs: bool = False,
) -> None:
    """Display the index as a Rich table (directory-scan mode)."""
    self._require_table("show")
    from mm.display import arrow_table_to_rich, output_console

    table = self._table_with_refs() if refs else self._table
    if refs and columns is None:
        columns = [c for c in table.column_names if c != "session_id"]
    rich_table = arrow_table_to_rich(table, columns=columns, limit=limit)
    output_console.print(rich_table)

sql(query)

Run a SQL query against the directory-scan Arrow table.

Source code in mm/context.py
def sql(self, query: str):
    """Run a SQL query against the directory-scan Arrow table."""
    self._require_table("sql")
    from mm.query import query_arrow_table

    return query_arrow_table(self._table, query)

to_arrow(*, refs=False)

Return the underlying PyArrow Table (directory-scan mode).

Source code in mm/context.py
def to_arrow(self, *, refs: bool = False):
    """Return the underlying PyArrow Table (directory-scan mode)."""
    self._require_table("to_arrow")
    return self._table_with_refs() if refs else self._table

to_md(mode='metadata')

Render a markdown table of every ref + source + content.

Parameters:

Name Type Description Default
mode Literal['metadata', 'fast', 'accurate']

"metadata" populates each row with the metadata-tier content (files.text_preview produced by extract_meta; no LLM call). "fast" runs light LLM extraction (requires a configured profile; not wired yet — currently raises NotImplementedError). "accurate" runs the LLM-backed path (requires a configured profile; not wired yet — currently raises NotImplementedError).

'metadata'

Returns:

Type Description
str

Markdown table (headers: ref | kind | source | content).

Source code in mm/context.py
def to_md(self, mode: Literal["metadata", "fast", "accurate"] = "metadata") -> str:
    """Render a markdown table of every ref + source + content.

    Args:
        mode: ``"metadata"`` populates each row with the metadata-tier content
            (``files.text_preview`` produced by ``extract_meta``; no LLM call).
            ``"fast"`` runs light LLM extraction (requires a
            configured profile; not wired yet — currently raises
            ``NotImplementedError``).
            ``"accurate"`` runs the LLM-backed path (requires a
            configured profile; not wired yet — currently raises
            ``NotImplementedError``).

    Returns:
        Markdown table (headers: ref | kind | source | content).
    """
    self._require_pyctx("to_md")
    if mode in ("fast", "accurate"):
        raise NotImplementedError(
            f"to_md(mode={mode!r}) is not implemented yet. "
            "Use mode='metadata' for the metadata-tier content."
        )
    contents = self._collect_metadata_contents()
    return self._pyctx.to_md_table(contents)

to_messages(format='openai', *, encoders=None, encoder_kwargs=None)

Encode every item into a role-aware VLM message list.

Parameters:

Name Type Description Default
format _FormatLiteral

Message shape. "openai" returns one chat.completions-style message per consecutive role run. "gemini" adapts image parts to the inline_data Part shape and folds non-user roles into labelled text parts.

'openai'
encoders dict[str, str] | None

Per-kind encoder overrides, e.g. {"image": "tile", "video": "mosaic"}. Unspecified kinds fall back to sensible defaults (resize, mosaic, rasterize, base64).

None
encoder_kwargs dict[str, dict[str, Any]] | None

Per-kind keyword arguments forwarded to the encoder's encode() method, e.g. {"document": {"pages_per_message": 8}}.

None

Returns:

Type Description
list[dict[str, Any]]

Message dicts for client.chat.completions.create (OpenAI)

list[dict[str, Any]]

or the Gemini generate_content call.

Raises:

Type Description
ValueError

If format is not "openai" or "gemini".

RuntimeError

If called on a directory-scan Context.

Source code in mm/context.py
def to_messages(
    self,
    format: _FormatLiteral = "openai",
    *,
    encoders: dict[str, str] | None = None,
    encoder_kwargs: dict[str, dict[str, Any]] | None = None,
) -> list[dict[str, Any]]:
    """Encode every item into a role-aware VLM message list.

    Args:
        format: Message shape. ``"openai"`` returns one
            ``chat.completions``-style message per consecutive role
            run. ``"gemini"`` adapts image parts to the
            ``inline_data`` Part shape and folds non-user roles into
            labelled text parts.
        encoders: Per-kind encoder overrides, e.g. ``{"image": "tile",
            "video": "mosaic"}``. Unspecified kinds fall back to
            sensible defaults (``resize``, ``mosaic``, ``rasterize``, ``base64``).
        encoder_kwargs: Per-kind keyword arguments forwarded to the
            encoder's ``encode()`` method, e.g.
            ``{"document": {"pages_per_message": 8}}``.

    Returns:
        Message dicts for ``client.chat.completions.create`` (OpenAI)
        or the Gemini ``generate_content`` call.

    Raises:
        ValueError: If ``format`` is not ``"openai"`` or ``"gemini"``.
        RuntimeError: If called on a directory-scan Context.
    """
    if format not in ("openai", "gemini"):
        raise ValueError(f"format must be 'openai' or 'gemini', got {format!r}")
    self._require_pyctx("to_messages")

    from mm.refs_messages import build_messages

    return build_messages(
        self, format=format, encoders=encoders or {}, encoder_kwargs=encoder_kwargs or {}
    )

to_pandas(*, refs=False)

Convert to Pandas DataFrame (directory-scan mode).

Source code in mm/context.py
def to_pandas(self, *, refs: bool = False):
    """Convert to Pandas DataFrame (directory-scan mode)."""
    self._require_table("to_pandas")
    from mm.df import arrow_to_pandas

    table = self._table_with_refs() if refs else self._table
    return arrow_to_pandas(table)

to_polars(*, refs=False)

Convert to Polars DataFrame (directory-scan mode).

Source code in mm/context.py
def to_polars(self, *, refs: bool = False):
    """Convert to Polars DataFrame (directory-scan mode)."""
    self._require_table("to_polars")
    from mm.df import arrow_to_polars

    table = self._table_with_refs() if refs else self._table
    return arrow_to_polars(table)